### Import Libraries

In [61]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

### Create a spark session

In [21]:
spark = SparkSession.builder\
        .appName("MySparkAppplication")\
        .getOrCreate()
    

### Read CSV file

In [22]:
csv_path = 'C:\\Users\\ajith\\data_spark_operations\\file2.csv'

df_csv = spark.read.format("csv") \
        .option("header",True) \
        .option("inferSchema","true") \
        .option("mode","permissive") \
        .load(csv_path)

display(df_csv)

DataFrame[Series_reference: string, Period: double, Data_value: double, Suppressed: string, STATUS: string, UNITS: string, Magnitude: int, Subject: string, Group: string, Series_title_1: string, Series_title_2: string, Series_title_3: string, Series_title_4: string, Series_title_5: string]

### Read a JSON file

In [23]:
json_path = "C:\\Users\\ajith\\data_spark_operations\\file1.json"

df_json = spark.read.format("json") \
         .option("multiline",True) \
         .option("mode","DROPMALFORMED") \
         .load(json_path)
df_json =  df_json.selectExpr("explode(people) as person").select("person.*")

df_json.show()

+---+---------+------+--------+----------+
|age|firstName|gender|lastName|    number|
+---+---------+------+--------+----------+
| 28|      Joe|  male| Jackson|7349282382|
| 32|    James|  male|   Smith|5678568567|
| 24|    Emily|female|   Jones| 456754675|
+---+---------+------+--------+----------+



### Read a Parquet file

In [73]:
parquet_path = "C:\\Users\\ajith\\data_spark_operations\\file3.parquet"

df_parquet = spark.read.format("parquet") \
             .load(parquet_path)

display(df_parquet)

df_parquet.show(5)



DataFrame[PassengerId: bigint, Survived: bigint, Pclass: bigint, Name: string, Sex: string, Age: double, SibSp: bigint, Parch: bigint, Ticket: string, Fare: double, Cabin: string, Embarked: string]

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

### Operations on Dataframe

In [52]:
# Filter, WithColumn, If else(When)

df_filter = df_parquet.filter(col("Sex")=="male")
df_withCol = df_parquet.withColumn("Todays date",current_date()) \
                       .withColumn("First name",trim(split(col("Name"),",")[1])) \
                       .withColumn("age",when(col("age").isNull(),lit(10)).otherwise(col("age"))) \
                       .withColumn("Flag",when(col("age")<25,"Minor").otherwise("Major")) \
                       .select("age","`Todays date`","`First name`","Flag") \
                       .orderBy(col("age").desc())

df_withCol.show(5)

+----+-----------+--------------------+-----+
| age|Todays date|          First name| Flag|
+----+-----------+--------------------+-----+
|80.0| 2025-03-10|Mr. Algernon Henr...|Major|
|74.0| 2025-03-10|           Mr. Johan|Major|
|71.0| 2025-03-10|           Mr. Ramon|Major|
|71.0| 2025-03-10|        Mr. George B|Major|
|70.5| 2025-03-10|         Mr. Patrick|Major|
+----+-----------+--------------------+-----+
only showing top 5 rows



In [59]:
# Aggregate functions on dataframe
# Count of male and female
df_cnt = df_parquet.groupBy("Sex").agg(count("PassengerId").alias("Cnt")).select("Sex","Cnt").orderBy(col("Sex").desc())
df_cnt.show()

# Total avg age for survived passengers
df_fare = df_parquet.filter(col("Survived")==1).agg(round(avg("age"),2).alias("age")).select("*")
df_fare.show()

+------+---+
|   Sex|Cnt|
+------+---+
|  male|577|
|female|314|
+------+---+

+-----+
|  age|
+-----+
|28.34|
+-----+



In [71]:
# Window functions
window = Window.partitionBy("Sex").orderBy("age")

df_window = df_parquet.withColumn("age",when(col("age").isNull(),lit(10)).otherwise(col("age"))) \
                      .withColumn("row_number",row_number().over(window)) \
                      .withColumn("rank",rank().over(window)) \
                      .withColumn("dense_rank",dense_rank().over(window)) \
                      .select("Sex","Age","row_number","rank","dense_rank") \
                      .orderBy(col("row_number").asc())

df_window.show(5)

+------+----+----------+----+----------+
|   Sex| Age|row_number|rank|dense_rank|
+------+----+----------+----+----------+
|  male|0.42|         1|   1|         1|
|female|0.75|         1|   1|         1|
|female|0.75|         2|   1|         1|
|  male|0.67|         2|   2|         2|
|female| 1.0|         3|   3|         2|
+------+----+----------+----+----------+
only showing top 5 rows



In [74]:
# Joins


+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS| UNITS|Magnitude|             Subject|               Group|Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+------+---------+--------------------+--------------------+--------------+--------------------+--------------+--------------+--------------+
|     BDCQ.SEA1AA|2011.06|   80078.0|      null|     F|Number|        0|Business Data Col...|Industry by emplo...|   Filled jobs|Agriculture, Fore...|        Actual|          null|          null|
|     BDCQ.SEA1AA|2011.09|   78324.0|      null|     F|Number|        0|Business Data Col...|Industry by emplo...|   Filled jobs|Agriculture, Fore...|        Actual|          null|          null|
|     BDCQ.SEA1AA|20

In [26]:
# Write dataframe as  file

df_parquet.coalesce(1).write \
        .mode("overwrite") \
        .format("csv") \
        .save("C:\\Users\\ajith\\data_spark_operations\\parquet_file3.csv")

df_parquet.repartition(10).write \
        .mode("overwrite") \
        .partitionBy("Sex").format("parquet") \
        .save("C:\\Users\\ajith\\data_spark_operations\\parquet_file3.parquet")