## Basic Structured Operations


### Step 1: Initialize PySpark Session


In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Create a Spark session
spark = SparkSession.builder.appName("day2").getOrCreate()


### Step 2: Load the Dataset


In [13]:
# Load the Occupation dataset into a Spark DataFrame
data_path = "occupation.csv"  # Replace with the actual path
occupation = spark.read.csv(data_path, header=True, inferSchema=True)


In [14]:
occupation.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



### Problem 1: Selecting Specific Columns
Problem: Select the "user_id," "age," and "occupation" columns from the occupation DataFrame.

In [8]:
occupation.select("user_id", "age", "occupation").show() #simply selects the columns and displays them

+-------+---+-------------+
|user_id|age|   occupation|
+-------+---+-------------+
|      1| 24|   technician|
|      2| 53|        other|
|      3| 23|       writer|
|      4| 24|   technician|
|      5| 33|        other|
|      6| 42|    executive|
|      7| 57|administrator|
|      8| 36|administrator|
|      9| 29|      student|
|     10| 53|       lawyer|
|     11| 39|        other|
|     12| 28|        other|
|     13| 47|     educator|
|     14| 45|    scientist|
|     15| 49|     educator|
|     16| 21|entertainment|
|     17| 30|   programmer|
|     18| 35|        other|
|     19| 40|    librarian|
|     20| 42|    homemaker|
+-------+---+-------------+
only showing top 20 rows



### Problem 2: Filtering Rows based on Condition
Problem: Find the users who are older than 30 years from the occupation DataFrame.

In [15]:
occupation.select("*").filter(col("age")>30).show() #selecting everything and filtering the age column to display age greater than 30

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|      2| 53|     F|        other|   94043|
|      5| 33|     F|        other|   15213|
|      6| 42|     M|    executive|   98101|
|      7| 57|     M|administrator|   91344|
|      8| 36|     M|administrator|   05201|
|     10| 53|     M|       lawyer|   90703|
|     11| 39|     F|        other|   30329|
|     13| 47|     M|     educator|   29206|
|     14| 45|     M|    scientist|   55106|
|     15| 49|     F|     educator|   97301|
|     18| 35|     F|        other|   37212|
|     19| 40|     M|    librarian|   02138|
|     20| 42|     F|    homemaker|   95660|
|     25| 39|     M|     engineer|   55107|
|     26| 49|     M|     engineer|   21044|
|     27| 40|     F|    librarian|   30030|
|     28| 32|     M|       writer|   55369|
|     29| 41|     M|   programmer|   94043|
|     34| 38|     F|administrator|   42141|
|     39| 41|     M|entertainmen

### Problem 3: Counting and Grouping
Problem: Count the number of users in each occupation from the occupation DataFrame.

In [24]:
occupation.groupBy("occupation").count().show() #groups by occupation and counts the total users in them

+-------------+-----+
|   occupation|count|
+-------------+-----+
|    librarian|   51|
|      retired|   14|
|       lawyer|   12|
|         none|    9|
|       writer|   45|
|   programmer|   66|
|    marketing|   26|
|        other|  105|
|    executive|   32|
|    scientist|   31|
|      student|  196|
|     salesman|   12|
|       artist|   28|
|   technician|   27|
|administrator|   79|
|     engineer|   67|
|   healthcare|   16|
|     educator|   95|
|entertainment|   18|
|    homemaker|    7|
+-------------+-----+
only showing top 20 rows



### Problem 4: Adding a New Column
Problem: Add a new column "age_group" to the occupation DataFrame based on the age of the users. Divide users into age groups: "18-25", "26-35", "36-50", and "51+".

In [26]:
from pyspark.sql.functions import when, lit #importing when and lit, used to specify condition and input value for condition. lit inputs literal/strings
#withColumn creates a new column
occupation.withColumn("age_group", \ 
   when((occupation.age >= 18) & (occupation.age <= 25), lit("18-25")) \
     .when((occupation.age >= 26) & (occupation.age <= 35), lit("26-35")) \
      .when((occupation.age >= 36) & (occupation.age <= 50), lit("36-50")) \
     .otherwise(lit("51+")) \
  ).show()

+-------+---+------+-------------+--------+---------+
|user_id|age|gender|   occupation|zip_code|age_group|
+-------+---+------+-------------+--------+---------+
|      1| 24|     M|   technician|   85711|    18-25|
|      2| 53|     F|        other|   94043|      51+|
|      3| 23|     M|       writer|   32067|    18-25|
|      4| 24|     M|   technician|   43537|    18-25|
|      5| 33|     F|        other|   15213|    26-35|
|      6| 42|     M|    executive|   98101|    36-50|
|      7| 57|     M|administrator|   91344|      51+|
|      8| 36|     M|administrator|   05201|    36-50|
|      9| 29|     M|      student|   01002|    26-35|
|     10| 53|     M|       lawyer|   90703|      51+|
|     11| 39|     F|        other|   30329|    36-50|
|     12| 28|     F|        other|   06405|    26-35|
|     13| 47|     M|     educator|   29206|    36-50|
|     14| 45|     M|    scientist|   55106|    36-50|
|     15| 49|     F|     educator|   97301|    36-50|
|     16| 21|     M|entertai

### Problem 5: Creating DataFrames and Converting to Spark Types
Problem: Given the provided code snippet, create a DataFrame df using the given data and schema. The schema includes columns for firstname, middlename, lastname, id, gender, and salary. After creating the DataFrame, print its schema and display its content without truncation.

In [45]:
Sample_Data = [("James", " ","Smith", "36636", "M", 3000),	
          ("Micheal", "Rose", " ", "40288", "M", 4000),
          ("Robert", " ","Williams", "42114", "M", 4000),
          ("Maria", "Anne","Jones", "39192", "F", 4000),
          ("Jen", "Mary", "Brown", " ", "F", -1)
  ]	#inputting the data

Sample_schema = ["firstname","middlename","lastname","id","gender","salary"]	
dataframe = spark.createDataFrame(data = Sample_Data, schema = Sample_schema)	

dataframe.printSchema()	
dataframe.show(truncate=False)	

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



                                                                                

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Micheal  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



### Problem 6: Adding and Renaming Columns
Problem: Add a new column "gender" to the existing DataFrame and rename the "Age" column to "Years".

In [30]:
df = occupation.withColumn("gender",lit("Unknown")) # adds a new column gender but since it already has it simply overwrites 
df2 = df.withColumnRenamed("age","Years") #renames the column age to years
df2.show()

+-------+-----+-------+-------------+--------+
|user_id|Years| gender|   occupation|zip_code|
+-------+-----+-------+-------------+--------+
|      1|   24|Unknown|   technician|   85711|
|      2|   53|Unknown|        other|   94043|
|      3|   23|Unknown|       writer|   32067|
|      4|   24|Unknown|   technician|   43537|
|      5|   33|Unknown|        other|   15213|
|      6|   42|Unknown|    executive|   98101|
|      7|   57|Unknown|administrator|   91344|
|      8|   36|Unknown|administrator|   05201|
|      9|   29|Unknown|      student|   01002|
|     10|   53|Unknown|       lawyer|   90703|
|     11|   39|Unknown|        other|   30329|
|     12|   28|Unknown|        other|   06405|
|     13|   47|Unknown|     educator|   29206|
|     14|   45|Unknown|    scientist|   55106|
|     15|   49|Unknown|     educator|   97301|
|     16|   21|Unknown|entertainment|   10309|
|     17|   30|Unknown|   programmer|   06355|
|     18|   35|Unknown|        other|   37212|
|     19|   4

### Problem 7: Filtering Rows and Sorting
Problem: Filter out users who are younger than 30 years and sort the DataFrame by age in descending order.

In [67]:
from pyspark.sql.functions import desc
df3 = df2.select("*").filter(col("Years")>30)
df3.sort(desc("Years")).show()

+-------+-----+-------+-------------+--------+
|user_id|Years| gender|   occupation|zip_code|
+-------+-----+-------+-------------+--------+
|    481|   73|Unknown|      retired|   37771|
|    767|   70|Unknown|     engineer|   00000|
|    803|   70|Unknown|administrator|   78212|
|    860|   70|Unknown|      retired|   48322|
|    559|   69|Unknown|    executive|   10022|
|    585|   69|Unknown|    librarian|   98501|
|    349|   68|Unknown|      retired|   61455|
|    573|   68|Unknown|      retired|   48911|
|    211|   66|Unknown|     salesman|   32605|
|    318|   65|Unknown|      retired|   06518|
|    564|   65|Unknown|      retired|   94591|
|    651|   65|Unknown|      retired|   02903|
|    423|   64|Unknown|        other|   91606|
|    845|   64|Unknown|       doctor|   97405|
|    364|   63|Unknown|     engineer|   01810|
|    777|   63|Unknown|   programmer|   01810|
|    858|   63|Unknown|     educator|   09645|
|    266|   62|Unknown|administrator|   78756|
|    520|   6

### Problem 8: Repartitioning and Collecting Rows
Problem: Repartition the DataFrame into 2 partitions without shuffling the data, then collect and display all rows in the driver and print number of partitions

In [65]:
partitioned_data = dataframe.coalesce(2) #partitions the dataframe without shuffling
rows = partitioned_data.collect() #for smaller datasets, use this. It stores in array format
for a in rows:
    print(a)

Row(firstname='James', middlename=' ', lastname='Smith', id='36636', gender='M', salary=3000)
Row(firstname='Micheal', middlename='Rose', lastname=' ', id='40288', gender='M', salary=4000)
Row(firstname='Robert', middlename=' ', lastname='Williams', id='42114', gender='M', salary=4000)
Row(firstname='Maria', middlename='Anne', lastname='Jones', id='39192', gender='F', salary=4000)
Row(firstname='Jen', middlename='Mary', lastname='Brown', id=' ', gender='F', salary=-1)


In [66]:
number_of_partition = partitioned_data.rdd.getNumPartitions() #to get current number of partitions of a dataframe. It's called on the dataframe underlying RDD
print("Number of Partitions =",number_of_partition)

Number of Partitions = 2


### Additional questions:

Use both spark SQL and Pyspark to obtain answer wherever relevant

#### Filter out rows where the age is greater than 30 and create a new DataFrame. Then, add a new column named "is_elderly" with a value of "True" for these rows and "False" otherwise.Rename the "gender" column to "sex".

In [120]:
# Spark SQL
occupation.createOrReplaceTempView("occu_temp")

result = spark.sql("""select user_id, 
age, 
gender as sex,
occupation, 
zip_code,
case when age > 30 then True else False end as is_elderly
from occu_temp""")
result.show()

+-------+---+---+-------------+--------+----------+
|user_id|age|sex|   occupation|zip_code|is_elderly|
+-------+---+---+-------------+--------+----------+
|      1| 24|  M|   technician|   85711|     false|
|      2| 53|  F|        other|   94043|      true|
|      3| 23|  M|       writer|   32067|     false|
|      4| 24|  M|   technician|   43537|     false|
|      5| 33|  F|        other|   15213|      true|
|      6| 42|  M|    executive|   98101|      true|
|      7| 57|  M|administrator|   91344|      true|
|      8| 36|  M|administrator|   05201|      true|
|      9| 29|  M|      student|   01002|     false|
|     10| 53|  M|       lawyer|   90703|      true|
|     11| 39|  F|        other|   30329|      true|
|     12| 28|  F|        other|   06405|     false|
|     13| 47|  M|     educator|   29206|      true|
|     14| 45|  M|    scientist|   55106|      true|
|     15| 49|  F|     educator|   97301|      true|
|     16| 21|  M|entertainment|   10309|     false|
|     17| 30

In [79]:
# Pyspark
#from pyspark.sql.functions import when, lit
occupation1 = occupation.withColumnRenamed("gender", "sex")
occupation1.withColumn("is_elderly", \
   when((occupation.age > 30), lit("true")) \
     .otherwise(lit("false")) \
  ).show()

+-------+---+---+-------------+--------+----------+
|user_id|age|sex|   occupation|zip_code|is_elderly|
+-------+---+---+-------------+--------+----------+
|      1| 24|  M|   technician|   85711|     false|
|      2| 53|  F|        other|   94043|      true|
|      3| 23|  M|       writer|   32067|     false|
|      4| 24|  M|   technician|   43537|     false|
|      5| 33|  F|        other|   15213|      true|
|      6| 42|  M|    executive|   98101|      true|
|      7| 57|  M|administrator|   91344|      true|
|      8| 36|  M|administrator|   05201|      true|
|      9| 29|  M|      student|   01002|     false|
|     10| 53|  M|       lawyer|   90703|      true|
|     11| 39|  F|        other|   30329|      true|
|     12| 28|  F|        other|   06405|     false|
|     13| 47|  M|     educator|   29206|      true|
|     14| 45|  M|    scientist|   55106|      true|
|     15| 49|  F|     educator|   97301|      true|
|     16| 21|  M|entertainment|   10309|     false|
|     17| 30

#### Calculate the average age of male and female users separately. Present the result in a new DataFrame with columns "gender" and "avg_age".

In [81]:
# Spark SQL
occupation.createOrReplaceTempView("occu_temp1")
resul = spark.sql("""select gender, 
avg(age) as avg_age
from occu_temp1
group by gender""")
resul.show()


+------+------------------+
|gender|           avg_age|
+------+------------------+
|     F| 33.81318681318681|
|     M|34.149253731343286|
+------+------------------+



In [91]:
# Pyspark
from pyspark.sql.functions import avg
occuu1 = occupation.groupBy("gender").agg(avg("age").alias("avg_age"))
occuu1.show()

+------+------------------+
|gender|           avg_age|
+------+------------------+
|     F| 33.81318681318681|
|     M|34.149253731343286|
+------+------------------+



#### Add a new column named "full_name" to the dataset by concatenating the "user_id" and "occupation" columns. Then, rename the "zip_code" column to "postal_code" in the same DataFrame.

In [95]:
# Spark SQL
occupation.createOrReplaceTempView("occu_temp2")

result1 = spark.sql("""select user_id, 
age, 
gender,
occupation, 
zip_code as postal_code,
concat(user_id, occupation) AS full_name
from occu_temp2""")

result1.show()

+-------+---+------+-------------+-----------+---------------+
|user_id|age|gender|   occupation|postal_code|      full_name|
+-------+---+------+-------------+-----------+---------------+
|      1| 24|     M|   technician|      85711|    1technician|
|      2| 53|     F|        other|      94043|         2other|
|      3| 23|     M|       writer|      32067|        3writer|
|      4| 24|     M|   technician|      43537|    4technician|
|      5| 33|     F|        other|      15213|         5other|
|      6| 42|     M|    executive|      98101|     6executive|
|      7| 57|     M|administrator|      91344| 7administrator|
|      8| 36|     M|administrator|      05201| 8administrator|
|      9| 29|     M|      student|      01002|       9student|
|     10| 53|     M|       lawyer|      90703|       10lawyer|
|     11| 39|     F|        other|      30329|        11other|
|     12| 28|     F|        other|      06405|        12other|
|     13| 47|     M|     educator|      29206|     13ed

In [107]:
# Pyspark
#to be discussed
from pyspark.sql.functions import concat
dff2=occupation.select(concat(occupation.user_id, occupation.occupation).alias("full_name"), "user_id", "age", "gender", "occupation", "zip_code")
dff2.show()

+---------------+-------+---+------+-------------+--------+
|      full_name|user_id|age|gender|   occupation|zip_code|
+---------------+-------+---+------+-------------+--------+
|    1technician|      1| 24|     M|   technician|   85711|
|         2other|      2| 53|     F|        other|   94043|
|        3writer|      3| 23|     M|       writer|   32067|
|    4technician|      4| 24|     M|   technician|   43537|
|         5other|      5| 33|     F|        other|   15213|
|     6executive|      6| 42|     M|    executive|   98101|
| 7administrator|      7| 57|     M|administrator|   91344|
| 8administrator|      8| 36|     M|administrator|   05201|
|       9student|      9| 29|     M|      student|   01002|
|       10lawyer|     10| 53|     M|       lawyer|   90703|
|        11other|     11| 39|     F|        other|   30329|
|        12other|     12| 28|     F|        other|   06405|
|     13educator|     13| 47|     M|     educator|   29206|
|    14scientist|     14| 45|     M|    

#### Filter out rows where occupation is 'technician', select only the "user_id" and "age" columns, and then add a new column "age_diff" that calculates the difference between the user's age and the average age in the dataset.

In [None]:
# Pyspark



#### Divide the dataset into two DataFrames: one with male users and another with female users. Repartition both DataFrames to have 2 partitions each. Then, union these two DataFrames together and display the resulting DataFrame.

#### Create and fill a new DataFrame named user_ratings with columns user_id and rating max 10 column. Both user_data and user_ratings share the user_id column. Combine these two DataFrames to create a new DataFrame that includes user information and their corresponding ratings. Make sure to keep only the users present in both DataFrames.