In [1]:
import pyspark
from pyspark.sql import SparkSession #Necessary for initializing pyspark
from pyspark.sql.functions import *

In [3]:
# Creating a SparkSession
spark = (SparkSession.builder.appName("SparkSQLExample").getOrCreate())

# Path to data set
csv_file = "onlinefoods.csv"

df = spark.read \
    .format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(csv_file)


# Creating a temporary view to run SQL Queries
df.createOrReplaceTempView("u") 

In [4]:
df.show()

+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+
|Age|Gender|Marital Status|    Occupation| Monthly Income|Educational Qualifications|Family size|latitude|longitude|Pin code|Output| Feedback|_c12|
+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+
| 20|Female|        Single|       Student|      No Income|             Post Graduate|          4| 12.9766|  77.5993|  560001|   Yes| Positive| Yes|
| 24|Female|        Single|       Student| Below Rs.10000|                  Graduate|          3|  12.977|  77.5773|  560009|   Yes| Positive| Yes|
| 22|  Male|        Single|       Student| Below Rs.10000|             Post Graduate|          3| 12.9551|  77.6593|  560017|   Yes|Negative | Yes|
| 22|Female|        Single|       Student|      No Income|                  Graduate|          6| 12.9473|  77.5

In [5]:
df.describe().show()

+-------+------------------+------+--------------+----------+--------------+--------------------------+------------------+--------------------+------------------+------------------+------+---------+----+
|summary|               Age|Gender|Marital Status|Occupation|Monthly Income|Educational Qualifications|       Family size|            latitude|         longitude|          Pin code|Output| Feedback|_c12|
+-------+------------------+------+--------------+----------+--------------+--------------------------+------------------+--------------------+------------------+------------------+------+---------+----+
|  count|               388|   388|           388|       388|           388|                       388|               388|                 388|               388|               388|   388|      388| 388|
|   mean|24.628865979381445|  NULL|          NULL|      NULL|          NULL|                      NULL|3.2809278350515463|  12.972057989690706| 77.60015953608251| 560040.1134020619|  N

In [6]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Monthly Income: string (nullable = true)
 |-- Educational Qualifications: string (nullable = true)
 |-- Family size: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- Pin code: integer (nullable = true)
 |-- Output: string (nullable = true)
 |-- Feedback: string (nullable = true)
 |-- _c12: string (nullable = true)



In [7]:
df.isEmpty()

False

In [8]:
df.columns

['Age',
 'Gender',
 'Marital Status',
 'Occupation',
 'Monthly Income',
 'Educational Qualifications',
 'Family size',
 'latitude',
 'longitude',
 'Pin code',
 'Output',
 'Feedback',
 '_c12']

In [9]:
df.count()

388

In [10]:
df.head(10)

[Row(Age=20, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='No Income', Educational Qualifications='Post Graduate', Family size=4, latitude=12.9766, longitude=77.5993, Pin code=560001, Output='Yes', Feedback='Positive', _c12='Yes'),
 Row(Age=24, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Graduate', Family size=3, latitude=12.977, longitude=77.5773, Pin code=560009, Output='Yes', Feedback='Positive', _c12='Yes'),
 Row(Age=22, Gender='Male', Marital Status='Single', Occupation='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Post Graduate', Family size=3, latitude=12.9551, longitude=77.6593, Pin code=560017, Output='Yes', Feedback='Negative ', _c12='Yes'),
 Row(Age=22, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='No Income', Educational Qualifications='Graduate', Family size=6, latitude=12.9473, longitude=77.5616, Pin

In [11]:
new_df = df.select('Age', 'Monthly Income')

In [12]:
new_df.show()

+---+---------------+
|Age| Monthly Income|
+---+---------------+
| 20|      No Income|
| 24| Below Rs.10000|
| 22| Below Rs.10000|
| 22|      No Income|
| 22| Below Rs.10000|
| 27|More than 50000|
| 22|      No Income|
| 24|      No Income|
| 23|      No Income|
| 23|      No Income|
| 22|      No Income|
| 23| Below Rs.10000|
| 23|      No Income|
| 21|      No Income|
| 23| 10001 to 25000|
| 24|      No Income|
| 28| 25001 to 50000|
| 23|      No Income|
| 25|      No Income|
| 21| Below Rs.10000|
+---+---------------+
only showing top 20 rows



In [13]:
first_two_rows = df.take(2)

# Print the result
for row in first_two_rows:
    print(row)

Row(Age=20, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='No Income', Educational Qualifications='Post Graduate', Family size=4, latitude=12.9766, longitude=77.5993, Pin code=560001, Output='Yes', Feedback='Positive', _c12='Yes')
Row(Age=24, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Graduate', Family size=3, latitude=12.977, longitude=77.5773, Pin code=560009, Output='Yes', Feedback='Positive', _c12='Yes')


In [17]:
first_row = df.first()

# Convert row to dictionary
row_dict = first_row.asDict()

row_dict

{'Age': 20,
 'Gender': 'Female',
 'Marital Status': 'Single',
 'Occupation': 'Student',
 'Monthly Income': 'No Income',
 'Educational Qualifications': 'Post Graduate',
 'Family size': 4,
 'latitude': 12.9766,
 'longitude': 77.5993,
 'Pin code': 560001,
 'Output': 'Yes',
 'Feedback': 'Positive',
 '_c12': 'Yes'}

In [19]:
# Adding column based on some condition
df = df.withColumn("Post_Graduate_Customer", when(df["Educational Qualifications"] == "Post Graduate", "Yes").otherwise("No"))

df.show()

+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
|Age|Gender|Marital Status|    Occupation| Monthly Income|Educational Qualifications|Family size|latitude|longitude|Pin code|Output| Feedback|_c12|Post_Graduate_Customer|
+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
| 20|Female|        Single|       Student|      No Income|             Post Graduate|          4| 12.9766|  77.5993|  560001|   Yes| Positive| Yes|                   Yes|
| 24|Female|        Single|       Student| Below Rs.10000|                  Graduate|          3|  12.977|  77.5773|  560009|   Yes| Positive| Yes|                    No|
| 22|  Male|        Single|       Student| Below Rs.10000|             Post Graduate|          3| 12.9551|  77.6593|  560017|   Yes|Negative | Ye

In [20]:
df = df.withColumnRenamed("Occupation", "Job Status")

df.show()

+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
|Age|Gender|Marital Status|    Job Status| Monthly Income|Educational Qualifications|Family size|latitude|longitude|Pin code|Output| Feedback|_c12|Post_Graduate_Customer|
+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
| 20|Female|        Single|       Student|      No Income|             Post Graduate|          4| 12.9766|  77.5993|  560001|   Yes| Positive| Yes|                   Yes|
| 24|Female|        Single|       Student| Below Rs.10000|                  Graduate|          3|  12.977|  77.5773|  560009|   Yes| Positive| Yes|                    No|
| 22|  Male|        Single|       Student| Below Rs.10000|             Post Graduate|          3| 12.9551|  77.6593|  560017|   Yes|Negative | Ye

In [21]:
# Dropping column
df.drop("Post_Graduate_Customer")

DataFrame[Age: int, Gender: string, Marital Status: string, Job Status: string, Monthly Income: string, Educational Qualifications: string, Family size: int, latitude: double, longitude: double, Pin code: int, Output: string, Feedback: string, _c12: string]

In [22]:
# using distinct() to remove duplicates
distinct_df = df.distinct()

print("DataFrame after removing duplicates")
distinct_df.show()

DataFrame after removing duplicates
+---+------+-----------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
|Age|Gender|   Marital Status|    Job Status| Monthly Income|Educational Qualifications|Family size|latitude|longitude|Pin code|Output| Feedback|_c12|Post_Graduate_Customer|
+---+------+-----------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
| 23|  Male|           Single|      Employee| 10001 to 25000|             Post Graduate|          2|  12.985|  77.5533|  560010|   Yes| Positive| Yes|                   Yes|
| 26|Female|           Single|Self Employeed| 25001 to 50000|             Post Graduate|          3| 12.9635|  77.5821|  560002|   Yes| Positive| Yes|                   Yes|
| 29|  Male|           Single|      Employee| 25001 to 50000|                  Graduate|      

In [25]:
# Example data
data = [("Alice", 34, "Single"),
        ("Bob", 45, "Married"),
        ("Charlie", 28, "Single")]

columns = ["Name", "Age", "MaritalStatus"]
df_new_data = spark.createDataFrame(data,columns)

# Register the dataFrame as temporary view
df_new_data.createOrReplaceTempView("people")

# Executing sql query using spark sql
result = spark.sql("Select * from people where Age > 30")

result.show()

+-----+---+-------------+
| Name|Age|MaritalStatus|
+-----+---+-------------+
|Alice| 34|       Single|
|  Bob| 45|      Married|
+-----+---+-------------+



In [33]:
# Register the dataFrame as temporary view
df.createOrReplaceTempView("food")

# Executing sql query using spark sql
result = spark.sql("Select * from food where Age > 31")

result.show()

+---+------+-----------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
|Age|Gender|   Marital Status|    Job Status| Monthly Income|Educational Qualifications|Family size|latitude|longitude|Pin code|Output| Feedback|_c12|Post_Graduate_Customer|
+---+------+-----------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
| 32|Female|Prefer not to say|    House wife|      No Income|                  Graduate|          5|  12.982|  77.6256|  560008|   Yes|Negative | Yes|                    No|
| 32|Female|          Married|    House wife|      No Income|                Uneducated|          3|  13.014|  77.5658|  560012|    No| Positive|  No|                    No|
| 32|Female|          Married|      Employee| 25001 to 50000|                  Graduate|          5| 12.9261|  77.6221|  560034|  

In [34]:
df.describe().show()

+-------+------------------+------+--------------+----------+--------------+--------------------------+------------------+--------------------+------------------+------------------+------+---------+----+----------------------+
|summary|               Age|Gender|Marital Status|Job Status|Monthly Income|Educational Qualifications|       Family size|            latitude|         longitude|          Pin code|Output| Feedback|_c12|Post_Graduate_Customer|
+-------+------------------+------+--------------+----------+--------------+--------------------------+------------------+--------------------+------------------+------------------+------+---------+----+----------------------+
|  count|               388|   388|           388|       388|           388|                       388|               388|                 388|               388|               388|   388|      388| 388|                   388|
|   mean|24.628865979381445|  NULL|          NULL|      NULL|          NULL|                

In [37]:
filtered_df.dtypes

[('Age', 'int'),
 ('Gender', 'string'),
 ('Marital Status', 'string'),
 ('Job Status', 'string'),
 ('Monthly Income', 'string'),
 ('Educational Qualifications', 'string'),
 ('Family size', 'int'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('Pin code', 'int'),
 ('Output', 'string'),
 ('Feedback', 'string'),
 ('_c12', 'string'),
 ('Post_Graduate_Customer', 'string')]

In [38]:
filtered_df = df.filter((col("Age") > 25) & (col("Marital Status") == "Single"))

filtered_df.show()

+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
|Age|Gender|Marital Status|    Job Status| Monthly Income|Educational Qualifications|Family size|latitude|longitude|Pin code|Output| Feedback|_c12|Post_Graduate_Customer|
+---+------+--------------+--------------+---------------+--------------------------+-----------+--------+---------+--------+------+---------+----+----------------------+
| 28|Female|        Single|      Employee| 25001 to 50000|             Post Graduate|          2| 12.9783|  77.6408|  560038|   Yes| Positive| Yes|                   Yes|
| 26|  Male|        Single|       Student|      No Income|             Post Graduate|          4| 13.0019|  77.5713|  560003|   Yes| Positive| Yes|                   Yes|
| 26|  Male|        Single|       Student|      No Income|             Post Graduate|          4| 12.9048|  77.6821|  560036|   Yes| Positive| Ye

In [39]:
# using collect() method to retrieve all rows as a list
collected_data = df.collect()

print(collected_data)

[Row(Age=20, Gender='Female', Marital Status='Single', Job Status='Student', Monthly Income='No Income', Educational Qualifications='Post Graduate', Family size=4, latitude=12.9766, longitude=77.5993, Pin code=560001, Output='Yes', Feedback='Positive', _c12='Yes', Post_Graduate_Customer='Yes'), Row(Age=24, Gender='Female', Marital Status='Single', Job Status='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Graduate', Family size=3, latitude=12.977, longitude=77.5773, Pin code=560009, Output='Yes', Feedback='Positive', _c12='Yes', Post_Graduate_Customer='No'), Row(Age=22, Gender='Male', Marital Status='Single', Job Status='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Post Graduate', Family size=3, latitude=12.9551, longitude=77.6593, Pin code=560017, Output='Yes', Feedback='Negative ', _c12='Yes', Post_Graduate_Customer='Yes'), Row(Age=22, Gender='Female', Marital Status='Single', Job Status='Student', Monthly Income='No Income', Educatio

Using GroupBy Function

In [40]:
marital_status_counts = df.groupBy("Marital Status").agg(count("*").alias("Count"))

marital_status_counts.show()

+-----------------+-----+
|   Marital Status|Count|
+-----------------+-----+
|Prefer not to say|   12|
|          Married|  108|
|           Single|  268|
+-----------------+-----+



Applying Inner Join

In [42]:
# Example data
data1 = [("Alice", 34, "Single"),
        ("Bob", 45, "Married"),
        ("Charlie", 28, "Single")]

columns1 = ["Name", "Age", "MaritalStatus"]
df1 = spark.createDataFrame(data1,columns1)

data2 = [("Alice", "New York"), 
         ("Bob", "Los Angeles"),
         ("Charlie", "Chicago")]

columns2 = ["Name","City"]
df2 = spark.createDataFrame(data2,columns2)

# Joining the two DataFrames based on the 'Name' column
joined_df = df1.join(df2, "Name", "inner") 

# Printing the joined DataFrame
joined_df.show()

+-------+---+-------------+-----------+
|   Name|Age|MaritalStatus|       City|
+-------+---+-------------+-----------+
|  Alice| 34|       Single|   New York|
|    Bob| 45|      Married|Los Angeles|
|Charlie| 28|       Single|    Chicago|
+-------+---+-------------+-----------+



Applying Cross Join

In [44]:
# Example data
data1 = [("Alice", 34, "Single"),
        ("Bob", 45, "Married"),
        ("Charlie", 28, "Single")]

columns1 = ["Name", "Age", "MaritalStatus"]
df1 = spark.createDataFrame(data1,columns1)

data2 = [("New York",), 
         ("Los Angeles",),
         ("Chicago",)]

columns2 = ["City"]
df2 = spark.createDataFrame(data2,columns2)

# Joining the two DataFrames based on the 'Name' column
cross_joined_df = df1.crossJoin(df2.select("*")) 

# Printing the joined DataFrame
cross_joined_df.show()

+-------+---+-------------+-----------+
|   Name|Age|MaritalStatus|       City|
+-------+---+-------------+-----------+
|  Alice| 34|       Single|   New York|
|  Alice| 34|       Single|Los Angeles|
|  Alice| 34|       Single|    Chicago|
|    Bob| 45|      Married|   New York|
|    Bob| 45|      Married|Los Angeles|
|    Bob| 45|      Married|    Chicago|
|Charlie| 28|       Single|   New York|
|Charlie| 28|       Single|Los Angeles|
|Charlie| 28|       Single|    Chicago|
+-------+---+-------------+-----------+



Applying Broadcast

In [45]:
# Example data
data1 = [("Alice", 34),
        ("Bob", 45),
        ("Charlie", 28)]

columns1 = ["Name", "Age"]
df1 = spark.createDataFrame(data1,columns1)

data2 = [("Alice", "New York"), 
         ("Bob", "Los Angeles"),
         ("Charlie", "Chicago"),
         ("David", "Houston"),
         ("Eve", "San Francisco")]

columns2 = ["Name","City"]
df2 = spark.createDataFrame(data2,columns2)

# Broadcast the smaller dataFrame before joining 
# (Always try to broadcast the smaller dataset -> Coz it will be copied to every executor's memory and occupy space there)
broadcast_df1 = broadcast(df1)

# Performing a join between the two dataFrames
joined_df = broadcast_df1.join(df2, "Name", "inner")

# Printing the joined DataFrame
joined_df.show()

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|  Alice| 34|   New York|
|    Bob| 45|Los Angeles|
|Charlie| 28|    Chicago|
+-------+---+-----------+



In [46]:
# Alias -> Giving name to a data frame
df_alias = df.alias("Person")

Applying Rollup

In [48]:
# Example data
data = [("Alice", "Sales",5000),
        ("Bob","Sales",6000),
        ("Charlie","Marketing",4000),
        ("David","Marketing",3000)]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data,columns)

# performing rollup aggregation across 'department' and 'name'
rollup_df = df.rollup("Department", "Name").sum("Salary")

# showing the results
rollup_df.show()

+----------+-------+-----------+
|Department|   Name|sum(Salary)|
+----------+-------+-----------+
|      NULL|   NULL|      18000|
|     Sales|  Alice|       5000|
|     Sales|   NULL|      11000|
|     Sales|    Bob|       6000|
| Marketing|   NULL|       7000|
| Marketing|Charlie|       4000|
| Marketing|  David|       3000|
+----------+-------+-----------+



Cube

In [50]:
# Example Data
data = [("Alice", "Sales",5000),
        ("Bob","Sales",6000),
        ("Charlie","Marketing",4000),
        ("David","Marketing",3000)]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data,columns)

# perform Cube Aggregation across department and Name
cube_df = df.cube("Department", "Name").sum("Salary")

# Show results
cube_df.show()

+----------+-------+-----------+
|Department|   Name|sum(Salary)|
+----------+-------+-----------+
|      NULL|  Alice|       5000|
|      NULL|   NULL|      18000|
|     Sales|  Alice|       5000|
|     Sales|   NULL|      11000|
|     Sales|    Bob|       6000|
|      NULL|    Bob|       6000|
| Marketing|   NULL|       7000|
| Marketing|Charlie|       4000|
|      NULL|Charlie|       4000|
|      NULL|  David|       3000|
| Marketing|  David|       3000|
+----------+-------+-----------+



Pivot

In [51]:
# Example Data
data = [("Alice", "Sales",5000),
        ("Bob","Sales",6000),
        ("Charlie","Marketing",4000),
        ("David","Marketing",3000)]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data,columns)

# perform Cube Aggregation across department and Name
pivot_df = df.groupby("Name").pivot("Department").sum("Salary")

# Show results
pivot_df.show()

+-------+---------+-----+
|   Name|Marketing|Sales|
+-------+---------+-----+
|Charlie|     4000| NULL|
|    Bob|     NULL| 6000|
|  Alice|     NULL| 5000|
|  David|     3000| NULL|
+-------+---------+-----+



Sort

In [53]:
# Example data
data = [("Alice", 34, "Single"),
        ("Bob", 45, "Married"),
        ("Charlie", 28, "Single")]

columns = ["Name", "Age", "MaritalStatus"]
df = spark.createDataFrame(data,columns)

sorted_df = df.sort("Age") #orderby will also give the same result

sorted_df.show()

+-------+---+-------------+
|   Name|Age|MaritalStatus|
+-------+---+-------------+
|Charlie| 28|       Single|
|  Alice| 34|       Single|
|    Bob| 45|      Married|
+-------+---+-------------+



Partition (Double check it's result)

In [55]:
# Example data
data = [("Alice", 34, "Single"),
        ("Bob", 45, "Married"),
        ("Charlie", 28, "Single")]

columns = ["Name", "Age", "MaritalStatus"]
df = spark.createDataFrame(data,columns)

# Repartitioning the data frame into 2 partitions for demonstration
df = df.repartition(2)

sorted_within_partitions_df = df.sortWithinPartitions("Age") 

sorted_within_partitions_df.show()

+-------+---+-------------+
|   Name|Age|MaritalStatus|
+-------+---+-------------+
|  Alice| 34|       Single|
|    Bob| 45|      Married|
|Charlie| 28|       Single|
+-------+---+-------------+



Dropping Duplicates

In [56]:
# Example data
data = [("Alice", 34, "Single"),
        ("Bob", 45, "Married"),
        ("Bob", 45, "Married"),
        ("Charlie", 28, "Single")]

columns = ["Name", "Age", "MaritalStatus"]
df = spark.createDataFrame(data,columns)

# Repartitioning the data frame into 2 partitions for demonstration
deduplicated_df = df.dropDuplicates()

deduplicated_df.show()

+-------+---+-------------+
|   Name|Age|MaritalStatus|
+-------+---+-------------+
|  Alice| 34|       Single|
|    Bob| 45|      Married|
|Charlie| 28|       Single|
+-------+---+-------------+

