In [0]:
from pyspark.sql import SparkSession
data = spark.read.table('phone_usage_india_csv_i_4_x_sl')
data.printSchema()

root
 |-- User ID: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Phone Brand: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- Screen Time (hrs/day): double (nullable = true)
 |-- Data Usage (GB/month): double (nullable = true)
 |-- Calls Duration (mins/day): double (nullable = true)
 |-- Number of Apps Installed: long (nullable = true)
 |-- Social Media Time (hrs/day): double (nullable = true)
 |-- E-commerce Spend (INR/month): long (nullable = true)
 |-- Streaming Time (hrs/day): double (nullable = true)
 |-- Gaming Time (hrs/day): double (nullable = true)
 |-- Monthly Recharge Cost (INR): long (nullable = true)
 |-- Primary Use: string (nullable = true)



In [0]:
spark = SparkSession.builder.appName('First Program in spark').getOrCreate()

In [0]:
## Optimization Operations
##  Divide the dataset based on certain rules
## 

In [0]:
data.select('Location').distinct().show()

+---------+
| Location|
+---------+
|  Lucknow|
|  Kolkata|
|Bangalore|
|  Chennai|
|   Jaipur|
|    Delhi|
|Ahmedabad|
|   Mumbai|
|     Pune|
|Hyderabad|
+---------+



In [0]:
## Partition based on a column
data.write.option("delta.columnMapping.mode", "name") \
    .partitionBy('Location') \
    .mode('overwrite') \
    .saveAsTable('workspace.default.locationbydata')

In [0]:
data = spark.read.table('locationbydata')

In [0]:
## If dataset is skewed, then partition may not give proper insights

In [0]:
mumbai_data = data.filter(data['Location']=="Mumbai")

In [0]:
data.columns

['User ID',
 'Age',
 'Gender',
 'Location',
 'Phone Brand',
 'OS',
 'Screen Time (hrs/day)',
 'Data Usage (GB/month)',
 'Calls Duration (mins/day)',
 'Number of Apps Installed',
 'Social Media Time (hrs/day)',
 'E-commerce Spend (INR/month)',
 'Streaming Time (hrs/day)',
 'Gaming Time (hrs/day)',
 'Monthly Recharge Cost (INR)',
 'Primary Use']

In [0]:
mumbai_data.groupBy("Location").agg({'Screen Time (hrs/day)':'avg'}).show()
## Average screen time for mumbai location

+--------+--------------------------+
|Location|avg(Screen Time (hrs/day))|
+--------+--------------------------+
|  Mumbai|        6.4210220673635305|
+--------+--------------------------+



In [0]:
from pyspark.sql.functions import broadcast
## Making something available to everyone

In [0]:
data = spark.read.table('phone_usage_india_csv_i_4_x_sl')
data1 = spark.read.table('phone_usage_india_csv_i_4_x_sl')

In [0]:
data1 = data1.withColumnRenamed("Screen Time (hrs/day)", "Screen Time")
data1 = data1.withColumnRenamed("Location", "location1")

In [0]:
data.join(broadcast(data1))
# join two whole  datasets in a broadcasted way.

DataFrame[User ID: string, Age: bigint, Gender: string, Location: string, Phone Brand: string, OS: string, Screen Time (hrs/day): double, Data Usage (GB/month): double, Calls Duration (mins/day): double, Number of Apps Installed: bigint, Social Media Time (hrs/day): double, E-commerce Spend (INR/month): bigint, Streaming Time (hrs/day): double, Gaming Time (hrs/day): double, Monthly Recharge Cost (INR): bigint, Primary Use: string, User ID: string, Age: bigint, Gender: string, Location: string, Phone Brand: string, OS: string, Screen Time (hrs/day): double, Data Usage (GB/month): double, Calls Duration (mins/day): double, Number of Apps Installed: bigint, Social Media Time (hrs/day): double, E-commerce Spend (INR/month): bigint, Streaming Time (hrs/day): double, Gaming Time (hrs/day): double, Monthly Recharge Cost (INR): bigint, Primary Use: string]

In [0]:
data_joined = data.join(broadcast(data1), data['Phone Brand'] == data1['Phone Brand'], "inner")
## join when phone brand is same

In [0]:
# data_joined is available to all the executor. We donot need to read this data again and can access the data without additional compute

In [0]:
data_joined.groupBy("Location").agg({'Screen Time (hrs/day)':'avg'}).show()

+---------+--------------------------+
| Location|avg(Screen Time (hrs/day))|
+---------+--------------------------+
|  Lucknow|          6.57994686386791|
|  Kolkata|        6.4125828194997405|
|Bangalore|         6.575263746703633|
|  Chennai|         6.529346644204585|
|   Jaipur|        6.6547867883182334|
|    Delhi|         6.469169119619524|
|Ahmedabad|        6.5639787135415055|
|   Mumbai|         6.421639501541751|
|     Pune|         6.667959590053698|
|Hyderabad|        6.5931141273780405|
+---------+--------------------------+



In [0]:
data_joined.columns

['User ID',
 'Age',
 'Gender',
 'Location',
 'Phone Brand',
 'OS',
 'Screen Time (hrs/day)',
 'Data Usage (GB/month)',
 'Calls Duration (mins/day)',
 'Number of Apps Installed',
 'Social Media Time (hrs/day)',
 'E-commerce Spend (INR/month)',
 'Streaming Time (hrs/day)',
 'Gaming Time (hrs/day)',
 'Monthly Recharge Cost (INR)',
 'Primary Use',
 'User ID',
 'Age',
 'Gender',
 'location1',
 'Phone Brand',
 'OS',
 'Screen Time',
 'Data Usage (GB/month)',
 'Calls Duration (mins/day)',
 'Number of Apps Installed',
 'Social Media Time (hrs/day)',
 'E-commerce Spend (INR/month)',
 'Streaming Time (hrs/day)',
 'Gaming Time (hrs/day)',
 'Monthly Recharge Cost (INR)',
 'Primary Use']

In [0]:
## select only relevant columns using the select operation

In [0]:
data = spark.read.table('phone_usage_india_csv_i_4_x_sl').select("Gender", "Age", "Location")
data.show()

+------+---+---------+
|Gender|Age| Location|
+------+---+---------+
|  Male| 53|   Mumbai|
| Other| 60|    Delhi|
|Female| 37|Ahmedabad|
|  Male| 32|     Pune|
|  Male| 16|   Mumbai|
|  Male| 21|   Jaipur|
|Female| 57|  Lucknow|
| Other| 56|  Kolkata|
|Female| 46|  Kolkata|
| Other| 44|  Kolkata|
| Other| 55|  Lucknow|
|Female| 41|    Delhi|
|  Male| 53|Bangalore|
|  Male| 35|   Jaipur|
|Female| 33|   Jaipur|
|  Male| 52|Bangalore|
| Other| 46|   Mumbai|
|  Male| 54|  Kolkata|
|Female| 50|  Chennai|
| Other| 40|Hyderabad|
+------+---+---------+
only showing top 20 rows


In [0]:
data.explain(True)

== Parsed Logical Plan ==
'Project ['Gender, 'Age, 'Location]
+- 'UnresolvedRelation [phone_usage_india_csv_i_4_x_sl], [], false

== Analyzed Logical Plan ==
Gender: string, Age: bigint, Location: string
Project [Gender#15859, Age#15858L, Location#15860]
+- SubqueryAlias workspace.default.phone_usage_india_csv_i_4_x_sl
   +- Relation workspace.default.phone_usage_india_csv_i_4_x_sl[User ID#15857,Age#15858L,Gender#15859,Location#15860,Phone Brand#15861,OS#15862,Screen Time (hrs/day)#15863,Data Usage (GB/month)#15864,Calls Duration (mins/day)#15865,Number of Apps Installed#15866L,Social Media Time (hrs/day)#15867,E-commerce Spend (INR/month)#15868L,Streaming Time (hrs/day)#15869,Gaming Time (hrs/day)#15870,Monthly Recharge Cost (INR)#15871L,Primary Use#15872] parquet

== Optimized Logical Plan ==
Project [Gender#15859, Age#15858L, Location#15860]
+- Relation workspace.default.phone_usage_india_csv_i_4_x_sl[User ID#15857,Age#15858L,Gender#15859,Location#15860,Phone Brand#15861,OS#15862,Sc