## Basic Structured Operations


### Step 1: Initialize PySpark Session


In [27]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("day2").getOrCreate()


### Step 2: Load the Dataset


In [28]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "occupation.csv"  # Replace with the actual path
occupation = spark.read.csv(data_path, header=True, inferSchema=True)


                                                                                

In [3]:
occupation.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



### Additional questions:

Use both spark SQL and Pyspark to obtain answer wherever relevant

#### Filter out rows where the age is greater than 30 and create a new DataFrame. Then, add a new column named "is_elderly" with a value of "True" for these rows and "False" otherwise.Rename the "gender" column to "sex".

In [30]:
occupation.createOrReplaceTempView("occupation")

In [4]:
# Spark SQL

# This part of code is for the first part of question - filter out of age>30
filtered_query = spark.sql("""
                select * from occupation o
                where o.age>30
""")
                  
# filtered_query.show()

# This part of code is for the second part to add new column that returns true or false in is_elderly column based on age
is_elderly_query = spark.sql("""
                select user_id,age,gender as sex, occupation,zip_code,
                case
                    when age>30 then 'true'
                    else 'false' 
                end as is_elderly
                from occupation
 """)

is_elderly_query.show()

+-------+---+---+-------------+--------+----------+
|user_id|age|sex|   occupation|zip_code|is_elderly|
+-------+---+---+-------------+--------+----------+
|      1| 24|  M|   technician|   85711|     false|
|      2| 53|  F|        other|   94043|      true|
|      3| 23|  M|       writer|   32067|     false|
|      4| 24|  M|   technician|   43537|     false|
|      5| 33|  F|        other|   15213|      true|
|      6| 42|  M|    executive|   98101|      true|
|      7| 57|  M|administrator|   91344|      true|
|      8| 36|  M|administrator|   05201|      true|
|      9| 29|  M|      student|   01002|     false|
|     10| 53|  M|       lawyer|   90703|      true|
|     11| 39|  F|        other|   30329|      true|
|     12| 28|  F|        other|   06405|     false|
|     13| 47|  M|     educator|   29206|      true|
|     14| 45|  M|    scientist|   55106|      true|
|     15| 49|  F|     educator|   97301|      true|
|     16| 21|  M|entertainment|   10309|     false|
|     17| 30

In [7]:
# Pyspark
from pyspark.sql.functions import when

filteredd = occupation.filter(occupation.age>30)
age_greater = occupation.withColumn("is_elderly",
    when(occupation.age>30,"true")
    .otherwise("false")
).withColumnRenamed("gender",'sex')
age_greater.show()

+-------+---+---+-------------+--------+----------+
|user_id|age|sex|   occupation|zip_code|is_elderly|
+-------+---+---+-------------+--------+----------+
|      1| 24|  M|   technician|   85711|     false|
|      2| 53|  F|        other|   94043|      true|
|      3| 23|  M|       writer|   32067|     false|
|      4| 24|  M|   technician|   43537|     false|
|      5| 33|  F|        other|   15213|      true|
|      6| 42|  M|    executive|   98101|      true|
|      7| 57|  M|administrator|   91344|      true|
|      8| 36|  M|administrator|   05201|      true|
|      9| 29|  M|      student|   01002|     false|
|     10| 53|  M|       lawyer|   90703|      true|
|     11| 39|  F|        other|   30329|      true|
|     12| 28|  F|        other|   06405|     false|
|     13| 47|  M|     educator|   29206|      true|
|     14| 45|  M|    scientist|   55106|      true|
|     15| 49|  F|     educator|   97301|      true|
|     16| 21|  M|entertainment|   10309|     false|
|     17| 30

#### Calculate the average age of male and female users separately. Present the result in a new DataFrame with columns "gender" and "avg_age".

In [9]:
# Spark SQL
averagAgeSql = spark.sql("""
                select gender, avg(age) as avg_age
                from occupation group by gender
""")

averagAgeSql.show()

+------+------------------+
|gender|           avg_age|
+------+------------------+
|     F| 33.81318681318681|
|     M|34.149253731343286|
+------+------------------+



In [11]:
# Pyspark
from pyspark.sql.functions import avg
average_age = occupation.groupBy("gender").agg(avg("age").alias("avg_age"))
average_age.show()

+------+------------------+
|gender|           avg_age|
+------+------------------+
|     F| 33.81318681318681|
|     M|34.149253731343286|
+------+------------------+



#### Add a new column named "full_name" to the dataset by concatenating the "user_id" and "occupation" columns. Then, rename the "zip_code" column to "postal_code" in the same DataFrame.

In [14]:
# Spark SQL
spark.sql("""
            select user_id,age,gender,occupation,zip_code as postal_code, concat(user_id,occupation) as full_name
            from occupation
            """).show()


+-------+---+------+-------------+-----------+---------------+
|user_id|age|gender|   occupation|postal_code|      full_name|
+-------+---+------+-------------+-----------+---------------+
|      1| 24|     M|   technician|      85711|    1technician|
|      2| 53|     F|        other|      94043|         2other|
|      3| 23|     M|       writer|      32067|        3writer|
|      4| 24|     M|   technician|      43537|    4technician|
|      5| 33|     F|        other|      15213|         5other|
|      6| 42|     M|    executive|      98101|     6executive|
|      7| 57|     M|administrator|      91344| 7administrator|
|      8| 36|     M|administrator|      05201| 8administrator|
|      9| 29|     M|      student|      01002|       9student|
|     10| 53|     M|       lawyer|      90703|       10lawyer|
|     11| 39|     F|        other|      30329|        11other|
|     12| 28|     F|        other|      06405|        12other|
|     13| 47|     M|     educator|      29206|     13ed

In [17]:
# Pyspark
from pyspark.sql.functions import col,concat

full_name_question = occupation.withColumn("full_name",concat(col("user_id"),col("occupation"))).withColumnRenamed("zip_code","postal_code")
full_name_question.show()

+-------+---+------+-------------+-----------+---------------+
|user_id|age|gender|   occupation|postal_code|      full_name|
+-------+---+------+-------------+-----------+---------------+
|      1| 24|     M|   technician|      85711|    1technician|
|      2| 53|     F|        other|      94043|         2other|
|      3| 23|     M|       writer|      32067|        3writer|
|      4| 24|     M|   technician|      43537|    4technician|
|      5| 33|     F|        other|      15213|         5other|
|      6| 42|     M|    executive|      98101|     6executive|
|      7| 57|     M|administrator|      91344| 7administrator|
|      8| 36|     M|administrator|      05201| 8administrator|
|      9| 29|     M|      student|      01002|       9student|
|     10| 53|     M|       lawyer|      90703|       10lawyer|
|     11| 39|     F|        other|      30329|        11other|
|     12| 28|     F|        other|      06405|        12other|
|     13| 47|     M|     educator|      29206|     13ed

#### Filter out rows where occupation is 'technician', select only the "user_id" and "age" columns, and then add a new column "age_diff" that calculates the difference between the user's age and the average age in the dataset.

In [31]:
# Spark SQL
average_age = spark.sql("SELECT AVG(age) AS avg_age FROM occupation").collect()[0]["avg_age"]

spark.sql("SELECT user_id, age, age - {} AS age_diff FROM occupation WHERE occupation = 'technician'"
          .format(average_age)) \
    .show()


+-------+---+------------------+
|user_id|age|          age_diff|
+-------+---+------------------+
|      1| 24|-10.05196182396607|
|      4| 24|-10.05196182396607|
|     44| 26| -8.05196182396607|
|     77| 30| -4.05196182396607|
|    143| 42|  7.94803817603393|
|    197| 55| 20.94803817603393|
|    244| 28| -6.05196182396607|
|    294| 34| -0.05196182396607|
|    311| 32| -2.05196182396607|
|    325| 48| 13.94803817603393|
|    441| 50| 15.94803817603393|
|    456| 24|-10.05196182396607|
|    458| 47| 12.94803817603393|
|    488| 48| 13.94803817603393|
|    545| 27| -7.05196182396607|
|    670| 30| -4.05196182396607|
|    715| 21|-13.05196182396607|
|    717| 24|-10.05196182396607|
|    718| 42|  7.94803817603393|
|    738| 35|  0.94803817603393|
+-------+---+------------------+
only showing top 20 rows



In [32]:
# Pyspark
average_age = occupation.select(avg("age")).collect()[0][0]

age_diff_df = occupation.filter(col("occupation") == "technician") \
                       .select("user_id", "age") \
                       .withColumn("age_diff", col("age") - average_age)

age_diff_df.show()


+-------+---+-------------------+
|user_id|age|           age_diff|
+-------+---+-------------------+
|      1| 24|-10.051961823966067|
|      4| 24|-10.051961823966067|
|     44| 26| -8.051961823966067|
|     77| 30| -4.051961823966067|
|    143| 42|  7.948038176033933|
|    197| 55| 20.948038176033933|
|    244| 28| -6.051961823966067|
|    294| 34|-0.0519618239660673|
|    311| 32|-2.0519618239660673|
|    325| 48| 13.948038176033933|
|    441| 50| 15.948038176033933|
|    456| 24|-10.051961823966067|
|    458| 47| 12.948038176033933|
|    488| 48| 13.948038176033933|
|    545| 27| -7.051961823966067|
|    670| 30| -4.051961823966067|
|    715| 21|-13.051961823966067|
|    717| 24|-10.051961823966067|
|    718| 42|  7.948038176033933|
|    738| 35| 0.9480381760339327|
+-------+---+-------------------+
only showing top 20 rows



#### Divide the dataset into two DataFrames: one with male users and another with female users. Repartition both DataFrames to have 2 partitions each. Then, union these two DataFrames together and display the resulting DataFrame.

In [20]:
male_df = spark.sql("""
                    select * from occupation
                    where gender = "M"
                     """).repartition(2)
                
male_df.show()
num_partitions = male_df.rdd.getNumPartitions()
print(num_partitions)

+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|    896| 28|     M|    writer|   91505|
|    156| 25|     M|  educator|   08360|
|    568| 39|     M|  educator|   01915|
|    624| 19|     M|   student|   30067|
|    832| 24|     M|technician|   77042|
|    684| 28|     M|   student|   55414|
|    905| 27|     M|     other|   30350|
|    148| 33|     M|  engineer|   97006|
|    313| 41|     M| marketing|   60035|
|    478| 29|     M|     other|   10019|
|    332| 20|     M|   student|   40504|
|    492| 57|     M|  educator|   94618|
|    833| 34|     M|    writer|   90019|
|    470| 24|     M|programmer|   10021|
|     21| 26|     M|    writer|   30068|
|    265| 26|     M| executive|   84601|
|     33| 23|     M|   student|   27510|
|    133| 53|     M|  engineer|   78602|
|    682| 23|     M|programmer|   55128|
|    650| 42|     M|  engineer|   83814|
+-------+---+------+----------+--------+
only showing top

In [21]:
female_df = spark.sql("""
                    select * from occupation
                    where gender = "F"
                     """).repartition(2)
                
female_df.show()
num_partitions = female_df.rdd.getNumPartitions()
print(num_partitions)

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|    505| 27|     F|        other|   20657|
|    241| 26|     F|      student|   20001|
|    629| 46|     F|        other|   44224|
|    482| 18|     F|      student|   40256|
|    304| 22|     F|      student|   71701|
|    147| 40|     F|    librarian|   02143|
|    354| 29|     F|    librarian|   48197|
|    588| 18|     F|      student|   93063|
|    175| 26|     F|    scientist|   21911|
|    490| 29|     F|       artist|   V5A2B|
|    457| 33|     F|     salesman|   30011|
|    165| 20|     F|        other|   53715|
|    342| 25|     F|        other|   98006|
|    401| 46|     F|   healthcare|   84107|
|    681| 44|     F|    marketing|   97208|
|    238| 42|     F|administrator|   44124|
|     52| 18|     F|      student|   55105|
|    556| 35|     F|     educator|   30606|
|    485| 44|     F|     educator|   95821|
|    126| 28|     F|       lawye

In [26]:
resulting_df = male_df.union(female_df)
resulting_df.show()


+-------+---+------+----------+--------+
|user_id|age|gender|occupation|zip_code|
+-------+---+------+----------+--------+
|    896| 28|     M|    writer|   91505|
|    156| 25|     M|  educator|   08360|
|    568| 39|     M|  educator|   01915|
|    624| 19|     M|   student|   30067|
|    832| 24|     M|technician|   77042|
|    684| 28|     M|   student|   55414|
|    905| 27|     M|     other|   30350|
|    148| 33|     M|  engineer|   97006|
|    313| 41|     M| marketing|   60035|
|    478| 29|     M|     other|   10019|
|    332| 20|     M|   student|   40504|
|    492| 57|     M|  educator|   94618|
|    833| 34|     M|    writer|   90019|
|    470| 24|     M|programmer|   10021|
|     21| 26|     M|    writer|   30068|
|    265| 26|     M| executive|   84601|
|     33| 23|     M|   student|   27510|
|    133| 53|     M|  engineer|   78602|
|    682| 23|     M|programmer|   55128|
|    650| 42|     M|  engineer|   83814|
+-------+---+------+----------+--------+
only showing top

#### Create and fill a new DataFrame named user_ratings with columns user_id and rating max 10 column. Both user_data and user_ratings share the user_id column. Combine these two DataFrames to create a new DataFrame that includes user information and their corresponding ratings. Make sure to keep only the users present in both DataFrames.

In [34]:
# Indian user data
user_data = [
    (1, "Rahul"),
    (2, "Priya"),
    (3, "Amit"),
    (4, "Neha"),
    (5, "Rajesh"),
    (6, "Sneha"),
    (7, "Arun"),
    (8, "Anjali"),
    (9, "Vikram"),
    (10, "Swati")
]

# Defining the schema for Indian user data DataFrame
user_schema = ["user_id", "name"]

# Create the Indian user data DataFrame
user_data_df = spark.createDataFrame(user_data,user_schema)

# Show the Indian user data DataFrame
user_data_df.show()


+-------+------+
|user_id|  name|
+-------+------+
|      1| Rahul|
|      2| Priya|
|      3|  Amit|
|      4|  Neha|
|      5|Rajesh|
|      6| Sneha|
|      7|  Arun|
|      8|Anjali|
|      9|Vikram|
|     10| Swati|
+-------+------+



In [35]:
user_ratings_data = [
    (1, 9),
    (2, 8),
    (3, 10),
    (4, 7),
    (5, 9),
    (6, 6),
    (7, 10),
    (8, 8),
    (9, 7),
    (10, 9)
]

# Define the schema for user_ratings DataFrame
user_ratings_schema = ["user_id", "rating"]

# Create the user_ratings DataFrame
user_ratings = spark.createDataFrame(user_ratings_data, user_ratings_schema)

# Select relevant columns from "user_ratings"
user_ratings.show()

+-------+------+
|user_id|rating|
+-------+------+
|      1|     9|
|      2|     8|
|      3|    10|
|      4|     7|
|      5|     9|
|      6|     6|
|      7|    10|
|      8|     8|
|      9|     7|
|     10|     9|
+-------+------+



In [38]:
combined_df = user_data_df.join(user_ratings, on="user_id")

combined_df.show()



+-------+------+------+
|user_id|  name|rating|
+-------+------+------+
|      1| Rahul|     9|
|      2| Priya|     8|
|      3|  Amit|    10|
|      4|  Neha|     7|
|      5|Rajesh|     9|
|      6| Sneha|     6|
|      7|  Arun|    10|
|      8|Anjali|     8|
|      9|Vikram|     7|
|     10| Swati|     9|
+-------+------+------+



                                                                                