## Basic Structured Operations


### Step 1: Initialize PySpark Session


In [1]:
#Importing the library to initiate the SparkSession
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("day2").getOrCreate()


23/09/04 15:41:00 WARN Utils: Your hostname, anujs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.68 instead (on interface en0)
23/09/04 15:41:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/04 15:41:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


23/09/04 15:41:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Step 2: Load the Dataset


In [2]:
# Load the Chipotle dataset into a Spark DataFrame
data_path = "/Users/anujkhadka/Fusemachines47/ALL SPARK/Spark_Assignment_2023/Day_2/occupation.csv"  # Replace with the actual path
occupation = spark.read.csv(data_path, header=True, inferSchema=True)


In [3]:
#To get the schema of the datasets
occupation.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)



### Problem 1: Selecting Specific Columns
Problem: Select the "user_id," "age," and "occupation" columns from the occupation DataFrame.

In [4]:
#Selectiong 'user_id' 'age' and 'occupation' from the dataframe

q1 = occupation.select('user_id',"age","occupation")
q1.show()

+-------+---+-------------+
|user_id|age|   occupation|
+-------+---+-------------+
|      1| 24|   technician|
|      2| 53|        other|
|      3| 23|       writer|
|      4| 24|   technician|
|      5| 33|        other|
|      6| 42|    executive|
|      7| 57|administrator|
|      8| 36|administrator|
|      9| 29|      student|
|     10| 53|       lawyer|
|     11| 39|        other|
|     12| 28|        other|
|     13| 47|     educator|
|     14| 45|    scientist|
|     15| 49|     educator|
|     16| 21|entertainment|
|     17| 30|   programmer|
|     18| 35|        other|
|     19| 40|    librarian|
|     20| 42|    homemaker|
+-------+---+-------------+
only showing top 20 rows



### Problem 2: Filtering Rows based on Condition
Problem: Find the users who are older than 30 years from the occupation DataFrame.

In [5]:
#Finding the users older than 30 years

older_than_30 = occupation.filter(occupation.age>30)
older_than_30.show()

+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|      2| 53|     F|        other|   94043|
|      5| 33|     F|        other|   15213|
|      6| 42|     M|    executive|   98101|
|      7| 57|     M|administrator|   91344|
|      8| 36|     M|administrator|   05201|
|     10| 53|     M|       lawyer|   90703|
|     11| 39|     F|        other|   30329|
|     13| 47|     M|     educator|   29206|
|     14| 45|     M|    scientist|   55106|
|     15| 49|     F|     educator|   97301|
|     18| 35|     F|        other|   37212|
|     19| 40|     M|    librarian|   02138|
|     20| 42|     F|    homemaker|   95660|
|     25| 39|     M|     engineer|   55107|
|     26| 49|     M|     engineer|   21044|
|     27| 40|     F|    librarian|   30030|
|     28| 32|     M|       writer|   55369|
|     29| 41|     M|   programmer|   94043|
|     34| 38|     F|administrator|   42141|
|     39| 41|     M|entertainmen

### Problem 3: Counting and Grouping
Problem: Count the number of users in each occupation from the occupation DataFrame.

In [6]:
#Counting the no. of users in each occupation from occupation dataframe
occupation.groupBy("occupation").count().withColumnRenamed("count","user_count").show()

+-------------+----------+
|   occupation|user_count|
+-------------+----------+
|    librarian|        51|
|      retired|        14|
|       lawyer|        12|
|         none|         9|
|       writer|        45|
|   programmer|        66|
|    marketing|        26|
|        other|       105|
|    executive|        32|
|    scientist|        31|
|      student|       196|
|     salesman|        12|
|       artist|        28|
|   technician|        27|
|administrator|        79|
|     engineer|        67|
|   healthcare|        16|
|     educator|        95|
|entertainment|        18|
|    homemaker|         7|
+-------------+----------+
only showing top 20 rows



### Problem 4: Adding a New Column
Problem: Add a new column "age_group" to the occupation DataFrame based on the age of the users. Divide users into age groups: "18-25", "26-35", "36-50", and "51+".

In [7]:
from pyspark.sql.functions import col,when

#Adding a new column called 'age_group'
age_grp = occupation.withColumn("age_group",
                      when((occupation.age).between(18,25), "18-25")
                      .when((occupation.age).between(26,35), "26-35")
                      .when((occupation.age).between(36,50), "36-50")
                      .otherwise("51+")
                      )

age_grp.show()

+-------+---+------+-------------+--------+---------+
|user_id|age|gender|   occupation|zip_code|age_group|
+-------+---+------+-------------+--------+---------+
|      1| 24|     M|   technician|   85711|    18-25|
|      2| 53|     F|        other|   94043|      51+|
|      3| 23|     M|       writer|   32067|    18-25|
|      4| 24|     M|   technician|   43537|    18-25|
|      5| 33|     F|        other|   15213|    26-35|
|      6| 42|     M|    executive|   98101|    36-50|
|      7| 57|     M|administrator|   91344|      51+|
|      8| 36|     M|administrator|   05201|    36-50|
|      9| 29|     M|      student|   01002|    26-35|
|     10| 53|     M|       lawyer|   90703|      51+|
|     11| 39|     F|        other|   30329|    36-50|
|     12| 28|     F|        other|   06405|    26-35|
|     13| 47|     M|     educator|   29206|    36-50|
|     14| 45|     M|    scientist|   55106|    36-50|
|     15| 49|     F|     educator|   97301|    36-50|
|     16| 21|     M|entertai

### Problem 5: Creating DataFrames and Converting to Spark Types
Problem: Given the provided code snippet, create a DataFrame df using the given data and schema. The schema includes columns for firstname, middlename, lastname, id, gender, and salary. After creating the DataFrame, print its schema and display its content without truncation.


In [8]:
from pyspark.sql import SparkSession

def create_session():
  spk = SparkSession.builder \
      .master("local") \
      .appName("dataframe_building") \
      .getOrCreate()
  return spk

spark1 = create_session()

23/09/04 15:45:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [10]:
df = spark1.createDataFrame([
    ("james","","smith","36636","M",3000),
    ("micheal","rose","","40288","M",4000),
    ("robert","","williams","42114","M",4000),
    ("maria","anne","jones","39192","F",4000),
    ("jen","mary","brown","","F",-1),
],["firstname","middlename","lastname","id","gender","salary"])

df.show()

                                                                                

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    james|          |   smith|36636|     M|  3000|
|  micheal|      rose|        |40288|     M|  4000|
|   robert|          |williams|42114|     M|  4000|
|    maria|      anne|   jones|39192|     F|  4000|
|      jen|      mary|   brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [11]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



### Problem 6: Adding and Renaming Columns
Problem: Add a new column "gender" to the existing DataFrame and rename the "Age" column to "Years".

In [12]:
from pyspark.sql.functions import lit,col

unknown_gender = occupation.withColumn("gender",lit("unknown"))
check = unknown_gender.withColumnRenamed("age","Years")
check.show()

+-------+-----+-------+-------------+--------+
|user_id|Years| gender|   occupation|zip_code|
+-------+-----+-------+-------------+--------+
|      1|   24|unknown|   technician|   85711|
|      2|   53|unknown|        other|   94043|
|      3|   23|unknown|       writer|   32067|
|      4|   24|unknown|   technician|   43537|
|      5|   33|unknown|        other|   15213|
|      6|   42|unknown|    executive|   98101|
|      7|   57|unknown|administrator|   91344|
|      8|   36|unknown|administrator|   05201|
|      9|   29|unknown|      student|   01002|
|     10|   53|unknown|       lawyer|   90703|
|     11|   39|unknown|        other|   30329|
|     12|   28|unknown|        other|   06405|
|     13|   47|unknown|     educator|   29206|
|     14|   45|unknown|    scientist|   55106|
|     15|   49|unknown|     educator|   97301|
|     16|   21|unknown|entertainment|   10309|
|     17|   30|unknown|   programmer|   06355|
|     18|   35|unknown|        other|   37212|
|     19|   4

### Problem 7: Filtering Rows and Sorting
Problem: Filter out users who are younger than 30 years and sort the DataFrame by age in descending order.

In [13]:
#Filtering out users who are younger than 30 years

filtered_task = check.filter(occupation.age>30)
sorted_result = filtered_task.orderBy(occupation.age.desc())
sorted_result.show()

+-------+-----+-------+-------------+--------+
|user_id|Years| gender|   occupation|zip_code|
+-------+-----+-------+-------------+--------+
|    481|   73|unknown|      retired|   37771|
|    767|   70|unknown|     engineer|   00000|
|    803|   70|unknown|administrator|   78212|
|    860|   70|unknown|      retired|   48322|
|    559|   69|unknown|    executive|   10022|
|    585|   69|unknown|    librarian|   98501|
|    349|   68|unknown|      retired|   61455|
|    573|   68|unknown|      retired|   48911|
|    211|   66|unknown|     salesman|   32605|
|    318|   65|unknown|      retired|   06518|
|    564|   65|unknown|      retired|   94591|
|    651|   65|unknown|      retired|   02903|
|    423|   64|unknown|        other|   91606|
|    845|   64|unknown|       doctor|   97405|
|    364|   63|unknown|     engineer|   01810|
|    777|   63|unknown|   programmer|   01810|
|    858|   63|unknown|     educator|   09645|
|    266|   62|unknown|administrator|   78756|
|    520|   6

### Problem 8: Repartitioning and Collecting Rows
Problem: Repartition the DataFrame into 2 partitions without shuffling the data, then collect and display all rows in the driver and print number of partitions

In [14]:
# Repartition the DataFrame into 2 partitions without shuffling
repartitioned_df = df.coalesce(2)

# Collect and display all rows in the driver
all_rows = repartitioned_df.collect()
for row in all_rows:
    print(row)

# Get the number of partitions
num_partitions = repartitioned_df.rdd.getNumPartitions()
print(num_partitions)

Row(firstname='james', middlename='', lastname='smith', id='36636', gender='M', salary=3000)
Row(firstname='micheal', middlename='rose', lastname='', id='40288', gender='M', salary=4000)
Row(firstname='robert', middlename='', lastname='williams', id='42114', gender='M', salary=4000)
Row(firstname='maria', middlename='anne', lastname='jones', id='39192', gender='F', salary=4000)
Row(firstname='jen', middlename='mary', lastname='brown', id='', gender='F', salary=-1)
2


In [16]:
# Repartition the DataFrame into 2 partitions without shuffling
repartitioned_df = df.coalesce(2)

# Collect and display all rows in the driver
all_rows = repartitioned_df.collect()
for row in all_rows:
    print(row)

# Get the number of partitions
num_partitions = repartitioned_df.rdd.getNumPartitions()
print(num_partitions)

Row(firstname='james', middlename='', lastname='smith', id='36636', gender='M', salary=3000)
Row(firstname='micheal', middlename='rose', lastname='', id='40288', gender='M', salary=4000)
Row(firstname='robert', middlename='', lastname='williams', id='42114', gender='M', salary=4000)
Row(firstname='maria', middlename='anne', lastname='jones', id='39192', gender='F', salary=4000)
Row(firstname='jen', middlename='mary', lastname='brown', id='', gender='F', salary=-1)
2


### Additional questions:

Use both spark SQL and Pyspark to obtain answer wherever relevant

#### Filter out rows where the age is greater than 30 and create a new DataFrame. Then, add a new column named "is_elderly" with a value of "True" for these rows and "False" otherwise.Rename the "gender" column to "sex".

In [18]:
# Spark SQL

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Load your data into a DataFrame (replace 'your_data_path' with the actual path)
data = spark.read.csv('/content/occupation.csv', header=True, inferSchema=True)

# Create a temporary view to use Spark SQL
data.createOrReplaceTempView("people")

# Use Spark SQL to filter and transform the data
filtered_data_sql = spark.sql("""
    SELECT *,
           CASE WHEN age > 30 THEN True ELSE False END as is_elderly,
           gender as sex
    FROM people
    WHERE age > 30
""")

# Show the resulting DataFrame
filtered_data_sql.show()


+-------+---+------+-------------+--------+----------+---+
|user_id|age|gender|   occupation|zip_code|is_elderly|sex|
+-------+---+------+-------------+--------+----------+---+
|      2| 53|     F|        other|   94043|      true|  F|
|      5| 33|     F|        other|   15213|      true|  F|
|      6| 42|     M|    executive|   98101|      true|  M|
|      7| 57|     M|administrator|   91344|      true|  M|
|      8| 36|     M|administrator|   05201|      true|  M|
|     10| 53|     M|       lawyer|   90703|      true|  M|
|     11| 39|     F|        other|   30329|      true|  F|
|     13| 47|     M|     educator|   29206|      true|  M|
|     14| 45|     M|    scientist|   55106|      true|  M|
|     15| 49|     F|     educator|   97301|      true|  F|
|     18| 35|     F|        other|   37212|      true|  F|
|     19| 40|     M|    librarian|   02138|      true|  M|
|     20| 42|     F|    homemaker|   95660|      true|  F|
|     25| 39|     M|     engineer|   55107|      true|  

In [None]:
# Pyspark



#### Calculate the average age of male and female users separately. Present the result in a new DataFrame with columns "gender" and "avg_age".

In [16]:
# Spark SQL

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("GenderAvgAge").getOrCreate()

# Sample user data DataFrame (replace this with your actual data)
data = [
    ("Male", 25),
    ("Female", 30),
    ("Male", 22),
    ("Female", 28),
    ("Male", 35)
]

# Create a DataFrame
user_data = spark.createDataFrame(data, ["gender", "age"])

# Register the DataFrame as a temporary SQL table
user_data.createOrReplaceTempView("user_data")

# Calculate the average age for each gender using SQL
average_age_query = """
    SELECT gender, AVG(age) as avg_age
    FROM user_data
    GROUP BY gender
"""

# Execute the SQL query
average_age_result = spark.sql(average_age_query)

# Show the result
average_age_result.show()


23/09/04 15:49:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+------+------------------+
|gender|           avg_age|
+------+------------------+
|  Male|27.333333333333332|
|Female|              29.0|
+------+------------------+



In [15]:
# Pyspark

import pandas as pd

# Sample user_data DataFrame (replace this with your actual data)
data = {
    'gender': ['Male', 'Female', 'Male', 'Female', 'Male'],
    'age': [25, 30, 22, 28, 35]
}

user_data = pd.DataFrame(data)

# Calculate average age for each gender
average_age_by_gender = user_data.groupby('gender')['age'].mean().reset_index()

# Rename the columns
average_age_by_gender.columns = ['gender', 'avg_age']

# Display the result
print(average_age_by_gender)


   gender    avg_age
0  Female  29.000000
1    Male  27.333333


#### Add a new column named "full_name" to the dataset by concatenating the "user_id" and "occupation" columns. Then, rename the "zip_code" column to "postal_code" in the same DataFrame.

In [17]:
# Spark SQL

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col

# Create a Spark session
spark = SparkSession.builder.appName("AddColumnAndRename").getOrCreate()

# Sample user data DataFrame (replace this with your actual data)
data = [
    (1, "John", "Doe", "Engineer", "12345"),
    (2, "Jane", "Smith", "Teacher", "54321"),
    (3, "Alice", "Johnson", "Doctor", "67890"),
]

# Create a DataFrame
user_data = spark.createDataFrame(data, ["user_id", "first_name", "last_name", "occupation", "zip_code"])

# Add a new column "full_name" by concatenating "user_id" and "occupation"
user_data = user_data.withColumn("full_name", concat_ws(" - ", col("user_id"), col("occupation")))

# Rename the "zip_code" column to "postal_code"
user_data = user_data.withColumnRenamed("zip_code", "postal_code")

# Show the updated DataFrame
user_data.show()


23/09/04 15:50:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+----------+---------+----------+-----------+------------+
|user_id|first_name|last_name|occupation|postal_code|   full_name|
+-------+----------+---------+----------+-----------+------------+
|      1|      John|      Doe|  Engineer|      12345|1 - Engineer|
|      2|      Jane|    Smith|   Teacher|      54321| 2 - Teacher|
|      3|     Alice|  Johnson|    Doctor|      67890|  3 - Doctor|
+-------+----------+---------+----------+-----------+------------+



In [18]:
# Pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col

# Create a Spark session
spark = SparkSession.builder.appName("AddColumnAndRename").getOrCreate()

# Sample user data DataFrame (replace this with your actual data)
data = [
    (1, "John", "Doe", "Engineer", "12345"),
    (2, "Jane", "Smith", "Teacher", "54321"),
    (3, "Alice", "Johnson", "Doctor", "67890"),
]

# Create a DataFrame
user_data = spark.createDataFrame(data, ["user_id", "first_name", "last_name", "occupation", "zip_code"])

# Add a new column "full_name" by concatenating "user_id" and "occupation"
user_data = user_data.withColumn("full_name", concat_ws(" - ", col("user_id"), col("occupation")))

# Rename the "zip_code" column to "postal_code"
user_data = user_data.withColumnRenamed("zip_code", "postal_code")

# Show the updated DataFrame
user_data.show()


+-------+----------+---------+----------+-----------+------------+
|user_id|first_name|last_name|occupation|postal_code|   full_name|
+-------+----------+---------+----------+-----------+------------+
|      1|      John|      Doe|  Engineer|      12345|1 - Engineer|
|      2|      Jane|    Smith|   Teacher|      54321| 2 - Teacher|
|      3|     Alice|  Johnson|    Doctor|      67890|  3 - Doctor|
+-------+----------+---------+----------+-----------+------------+



#### Filter out rows where occupation is 'technician', select only the "user_id" and "age" columns, and then add a new column "age_diff" that calculates the difference between the user's age and the average age in the dataset.

In [None]:
# Spark SQL



In [None]:
# Pyspark



#### Divide the dataset into two DataFrames: one with male users and another with female users. Repartition both DataFrames to have 2 partitions each. Then, union these two DataFrames together and display the resulting DataFrame.

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("GenderDivideRepartitionUnion").getOrCreate()

# Sample user data DataFrame (replace this with your actual data)
data = [
    (1, "John", "Doe", "Male", "Engineer"),
    (2, "Jane", "Smith", "Female", "Teacher"),
    (3, "Alice", "Johnson", "Female", "Doctor"),
    (4, "Bob", "Brown", "Male", "Artist"),
]

# Create a DataFrame
user_data = spark.createDataFrame(data, ["user_id", "first_name", "last_name", "gender", "occupation"])

# Divide the dataset into two DataFrames: one with male users and one with female users
male_users = user_data.filter(col("gender") == "Male")
female_users = user_data.filter(col("gender") == "Female")

# Repartition both DataFrames to have 2 partitions each
male_users = male_users.repartition(2)
female_users = female_users.repartition(2)

# Union the two DataFrames
combined_users = male_users.union(female_users)

# Show the resulting DataFrame
combined_users.show()


23/09/04 15:52:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+----------+---------+------+----------+
|user_id|first_name|last_name|gender|occupation|
+-------+----------+---------+------+----------+
|      1|      John|      Doe|  Male|  Engineer|
|      4|       Bob|    Brown|  Male|    Artist|
|      2|      Jane|    Smith|Female|   Teacher|
|      3|     Alice|  Johnson|Female|    Doctor|
+-------+----------+---------+------+----------+



#### Create and fill a new DataFrame named user_ratings with columns user_id and rating max 10 column. Both user_data and user_ratings share the user_id column. Combine these two DataFrames to create a new DataFrame that includes user information and their corresponding ratings. Make sure to keep only the users present in both DataFrames.

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("UserRatings").getOrCreate()

# Sample user data DataFrame (replace this with your actual data)
user_data_data = [
    (1, "John", "Doe", "Engineer"),
    (2, "Jane", "Smith", "Teacher"),
    (3, "Alice", "Johnson", "Doctor"),
]

# Create a user data DataFrame
user_data = spark.createDataFrame(user_data_data, ["user_id", "first_name", "last_name", "occupation"])

# Sample user ratings DataFrame (replace this with your actual data)
user_ratings_data = [
    (1, 8),
    (2, 9),
    (4, 7),  # User not present in user_data
    (3, 10),
]

# Create a user ratings DataFrame
user_ratings = spark.createDataFrame(user_ratings_data, ["user_id", "rating_max_10"])

# Combine user_data and user_ratings DataFrames for users present in both
combined_data = user_data.join(user_ratings, "user_id", "inner")

# Show the combined DataFrame
combined_data.show()


23/09/04 15:51:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+----------+---------+----------+-------------+
|user_id|first_name|last_name|occupation|rating_max_10|
+-------+----------+---------+----------+-------------+
|      1|      John|      Doe|  Engineer|            8|
|      2|      Jane|    Smith|   Teacher|            9|
|      3|     Alice|  Johnson|    Doctor|           10|
+-------+----------+---------+----------+-------------+

