In [None]:
!pwd

!ls

# Check the open jdk version on colab
!ls /usr/lib/jvm/

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Download Apache Spark binary
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Unzip file
!tar -xvzf spark-3.5.0-bin-hadoop3.tgz

In [None]:
!pip install -q findspark
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Hands-on PySpark on Google Colab").getOrCreate()

In [None]:
spark_data = spark.read.format('csv').options(header='true').load("/content/sample_data/california_housing_train.csv")
spark_data.show(5, truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import sum, count, desc

data = [(101, 200.5), (102, 100.2),(103, 50.5), (101, 75.3),(102, 150.2)]

schema = StructType([StructField("User_ID",IntegerType(),True),StructField("Amount_Paid",FloatType(),True)])

spark = SparkSession.builder.appName("sample_data").getOrCreate()

df = spark.createDataFrame(data=data, schema=schema)

df.show()

spark = SparkSession.builder.appName("grocery_analysis").getOrCreate()

result_df = df.groupBy("User_ID").agg(sum("Amount_Paid").alias("Total_Amount_Paid"),count("User_ID").alias("Number_of_Visits"))

result_df = result_df.orderBy(desc("Number_of_Visits"))
result_df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("data_operations").getOrCreate()

data1 = [("Alice", 25, "Engineering"),("Bob", 30, "Finance"),("Charlie", 22, "Marketing"),("Alice", 25, "Engineering"), ("David", 28, "Human Resources")]

schema1 = StructType([StructField("Name", StringType(), True),StructField("Age", IntegerType(), True),
                      StructField("Department", StringType(), True)])

df1 = spark.createDataFrame(data=data1, schema=schema1)

print("1) Number of Columns and Rows:")
print("   Columns:", len(df1.columns))
print("   Rows:", df1.count())

df1 = df1.dropDuplicates()
print("\n2) DataFrame after removing duplicates:")
df1.show()

columns_to_crop = ["Name", "Age"]
cropped_df1 = df1.select(columns_to_crop)
print("\n3) Cropped DataFrame:")
cropped_df1.show()

data2 = [("Alice", "Project Manager"),("Bob", "Financial Analyst"),("Charlie", "Marketing Specialist"),("David", "HR Manager")]

schema2 = StructType([StructField("Name", StringType(), True),StructField("Job_Title", StringType(), True)])

df2 = spark.createDataFrame(data=data2, schema=schema2)

joined_df = df1.join(df2, "Name", "inner")
print("\n4) Joined DataFrame:")
joined_df.show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")

age_distribution = df1.select("Age").toPandas()
plt.figure(figsize=(8, 6))
sns.histplot(age_distribution["Age"], bins=20, kde=True)
plt.show()