In [None]:
# Step 1: Install PySpark in Colab
!pip install pyspark

# Step 2: Initialize Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EV Sales Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Step 3: Load the EV Dataset
# Replace the path with the actual path of the EV dataset if needed
ev_df = spark.read.csv('/content/EV_Dataset.csv', header=True, inferSchema=True)
ev_df.show(5)
ev_df.printSchema()




+------+----------+--------+--------------+--------------------+----------------+------------+-----------------+
|  Year|Month_Name|    Date|         State|       Vehicle_Class|Vehicle_Category|Vehicle_Type|EV_Sales_Quantity|
+------+----------+--------+--------------+--------------------+----------------+------------+-----------------+
|2014.0|       jan|1/1/2014|Andhra Pradesh|     ADAPTED VEHICLE|          Others|      Others|              0.0|
|2014.0|       jan|1/1/2014|Andhra Pradesh|AGRICULTURAL TRACTOR|          Others|      Others|              0.0|
|2014.0|       jan|1/1/2014|Andhra Pradesh|           AMBULANCE|          Others|      Others|              0.0|
|2014.0|       jan|1/1/2014|Andhra Pradesh| ARTICULATED VEHICLE|          Others|      Others|              0.0|
|2014.0|       jan|1/1/2014|Andhra Pradesh|                 BUS|             Bus|         Bus|              0.0|
+------+----------+--------+--------------+--------------------+----------------+------------+--

In [None]:
# Demo Code 1 - Test basic RDD creation and display
rdd = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'), (4, 5, 6, 'd e f'), (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3', 'col4'])
rdd.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



In [None]:
# Demo Code 2 - Test reading an external CSV file (Advertising.csv)
adv_df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/content/Advertising.csv')
adv_df.show(5)
adv_df.printSchema()

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows

root
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [None]:
# Task 1: Transformations
# 1. map(func): Example mapping each state with increased sales quantity by 10%
mapped_rdd = ev_df.rdd.map(lambda row: (row['State'], row['EV_Sales_Quantity'] * 1.1))
mapped_rdd.toDF(['State', 'Updated_Sales']).show(5)

+--------------+-------------+
|         State|Updated_Sales|
+--------------+-------------+
|Andhra Pradesh|          0.0|
|Andhra Pradesh|          0.0|
|Andhra Pradesh|          0.0|
|Andhra Pradesh|          0.0|
|Andhra Pradesh|          0.0|
+--------------+-------------+
only showing top 5 rows



In [None]:
# 2. filter(func): Filter for only two-wheeler vehicle types
filtered_df = ev_df.filter(ev_df['Vehicle_Type'] == 'Two Wheeler')
filtered_df.show()

+----+----------+----+-----+-------------+----------------+------------+-----------------+
|Year|Month_Name|Date|State|Vehicle_Class|Vehicle_Category|Vehicle_Type|EV_Sales_Quantity|
+----+----------+----+-----+-------------+----------------+------------+-----------------+
+----+----------+----+-----+-------------+----------------+------------+-----------------+



In [None]:
# 3. flatMap(func): Example splitting state names (each word as a separate row)
flat_mapped_rdd = ev_df.rdd.flatMap(lambda row: [(word,) for word in row['State'].split()])
flat_mapped_df = flat_mapped_rdd.toDF(["Words"])
flat_mapped_df.show(5)

+-------+
|  Words|
+-------+
| Andhra|
|Pradesh|
| Andhra|
|Pradesh|
| Andhra|
+-------+
only showing top 5 rows



In [None]:
# 4. distinct(): Retrieve distinct vehicle types
distinct_vehicle_types = ev_df.select("Vehicle_Type").distinct()
distinct_vehicle_types.show()

+------------------+
|      Vehicle_Type|
+------------------+
|         3W_Shared|
|          3W_Goods|
|   Institution Bus|
|       3W_Personal|
|       2W_Personal|
|            Others|
| 3W_Goods_LowSpeed|
|       4W_Personal|
|               Bus|
|3W_Shared_LowSpeed|
|         2W_Shared|
|         4W_Shared|
+------------------+



In [None]:
# 5. union(otherRDD): Union of EV and Advertising dataset for demo (structure demonstration)
# Assuming adv_df has a column named "TV" to match structure
union_df = ev_df.select("State").union(adv_df.select("TV").withColumnRenamed("TV", "Common_Field"))
union_df.show(5)

+--------------+
|         State|
+--------------+
|Andhra Pradesh|
|Andhra Pradesh|
|Andhra Pradesh|
|Andhra Pradesh|
|Andhra Pradesh|
+--------------+
only showing top 5 rows



In [None]:
# 6. intersection(otherRDD): Example intersection (similar values between state and TV columns)
# Ensure adv_df has a column "TV" and the types match
intersect_df = ev_df.select("State").intersect(adv_df.select("TV").withColumnRenamed("TV", "State"))
intersect_df.show()

+-----+
|State|
+-----+
+-----+



In [None]:
# 7. subtract(otherRDD): Example difference (EV states not in Advertising CSV's TV column)
subtracted_df = ev_df.select("State").subtract(adv_df.select("TV").withColumnRenamed("TV", "State"))
subtracted_df.show()

+-----------------+
|            State|
+-----------------+
|         Nagaland|
|        Karnataka|
|           Odisha|
|           Kerala|
|           Ladakh|
|       Tamil Nadu|
|     Chhattisgarh|
|   Andhra Pradesh|
|   Madhya Pradesh|
|       DNH and DD|
|           Punjab|
|          Manipur|
|              Goa|
|          Mizoram|
| Himachal Pradesh|
|       Puducherry|
|          Haryana|
|Jammu and Kashmir|
|        Jharkhand|
|Arunachal Pradesh|
+-----------------+
only showing top 20 rows



In [None]:
# 8. groupByKey(): Group sales quantity by state
# Ensure to use correct column name "EV_Sales_Quantity"
sales_rdd = ev_df.rdd.map(lambda row: (row['State'], row['EV_Sales_Quantity']))
grouped_sales = sales_rdd.groupByKey().mapValues(list)
grouped_sales_df = grouped_sales.toDF(["State", "Sales_List"])
grouped_sales_df.show(5)

+-----------------+--------------------+
|            State|          Sales_List|
+-----------------+--------------------+
|   Andhra Pradesh|[0.0, 0.0, 0.0, 0...|
|Arunachal Pradesh|[0.0, 0.0, 0.0, 0...|
|            Assam|[0.0, 0.0, 0.0, 0...|
|            Bihar|[0.0, 0.0, 0.0, 0...|
|       Chandigarh|[0.0, 0.0, 0.0, 0...|
+-----------------+--------------------+
only showing top 5 rows



In [None]:
# 9. reduceByKey(func): Sum of EV sales by state
total_sales = sales_rdd.reduceByKey(lambda x, y: x + y)
total_sales_df = total_sales.toDF(["State", "Total_Sales"])
total_sales_df.show(5)

+-----------------+-----------+
|            State|Total_Sales|
+-----------------+-----------+
|   Andhra Pradesh|    77356.0|
|Arunachal Pradesh|       40.0|
|            Assam|   151917.0|
|            Bihar|   213465.0|
|       Chandigarh|    11453.0|
+-----------------+-----------+
only showing top 5 rows



In [None]:
# 10. sortBy(func): Sort by EV sales quantity in descending order
sorted_df = ev_df.orderBy("EV_Sales_Quantity", ascending=False)
sorted_df.show(5)

+------+----------+---------+-------------+---------------+----------------+------------------+-----------------+
|  Year|Month_Name|     Date|        State|  Vehicle_Class|Vehicle_Category|      Vehicle_Type|EV_Sales_Quantity|
+------+----------+---------+-------------+---------------+----------------+------------------+-----------------+
|2023.0|       aug| 8/1/2023|Uttar Pradesh|  E-RICKSHAW(P)|      3-Wheelers|3W_Shared_LowSpeed|          20584.0|
|2023.0|       dec|12/1/2023|Uttar Pradesh|  E-RICKSHAW(P)|      3-Wheelers|3W_Shared_LowSpeed|          20352.0|
|2023.0|       may| 5/1/2023|  Maharashtra|M-CYCLE/SCOOTER|      2-Wheelers|       2W_Personal|          19908.0|
|2023.0|       sep| 9/1/2023|Uttar Pradesh|  E-RICKSHAW(P)|      3-Wheelers|3W_Shared_LowSpeed|          19486.0|
|2023.0|       jul| 7/1/2023|Uttar Pradesh|  E-RICKSHAW(P)|      3-Wheelers|3W_Shared_LowSpeed|          18769.0|
+------+----------+---------+-------------+---------------+----------------+------------

In [None]:
# Task 2: Actions
# 1. collect(): Collect all data to driver as a list (use with caution on large datasets)
collected_data = ev_df.collect()
print("Collected Data:", collected_data[:5])  # Printing only first 5 records for demonstration

Collected Data: [Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='ADAPTED VEHICLE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='AGRICULTURAL TRACTOR', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='AMBULANCE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='ARTICULATED VEHICLE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='BUS', Vehicle_Category='Bus', Vehicle_Type='Bus', EV_Sales_Quantity=0.0)]


In [None]:
# 2. count(): Count number of rows in the dataset
row_count = ev_df.count()
print("Row Count:", row_count)

Row Count: 96845


In [None]:
# 3. take(n): Retrieve first n rows
top_rows = ev_df.take(5)
print("Top 5 Rows:", top_rows)

Top 5 Rows: [Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='ADAPTED VEHICLE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='AGRICULTURAL TRACTOR', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='AMBULANCE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='ARTICULATED VEHICLE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0), Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='BUS', Vehicle_Category='Bus', Vehicle_Type='Bus', EV_Sales_Quantity=0.0)]


In [None]:
# 4. first(): Retrieve the first row
first_row = ev_df.first()
print("First Row:", first_row)

First Row: Row(Year=2014.0, Month_Name='jan', Date='1/1/2014', State='Andhra Pradesh', Vehicle_Class='ADAPTED VEHICLE', Vehicle_Category='Others', Vehicle_Type='Others', EV_Sales_Quantity=0.0)


In [None]:
# 5. foreach(func): Apply function on each element for side effect (printing example)
ev_df.foreach(lambda row: print(row))

In [None]:
# 6. reduce(func): Example of reducing EV_Sales_Quantity with summation
total_sales_quantity = ev_df.rdd.map(lambda row: row['EV_Sales_Quantity']).reduce(lambda x, y: x + y)
print("Total Sales Quantity:", total_sales_quantity)
# 7. saveAsTextFile(path): Save the DataFrame to a text file (uncomment to use and specify a path)
# ev_df.write.mode('overwrite').text("/content/ev_sales_text")

# 8. saveAsSequenceFile(path): Save RDD as sequence file (for Hadoop sequence files)
# grouped_sales.saveAsSequenceFile("/content/grouped_sales_sequence")


Total Sales Quantity: 3593811.0


In [None]:
# 9. countByKey(): Count sales occurrences by state
state_sales_count = sales_rdd.countByKey()
print("Sales Count by State:", dict(state_sales_count))

# 10. saveAsParquetFile(path): Save as Parquet format
# ev_df.write.mode('overwrite').parquet("/content/ev_sales_parquet")

# Stop the Spark session at the end of the assignment
spark.stop()

Sales Count by State: {'Andhra Pradesh': 3457, 'Arunachal Pradesh': 2285, 'Assam': 3114, 'Andaman & Nicobar Island': 1226, 'Bihar': 2544, 'Chhattisgarh': 3590, 'Chandigarh': 1554, 'DNH and DD': 1927, 'Delhi': 1871, 'Goa': 2139, 'Gujarat': 4517, 'Himachal Pradesh': 2980, 'Haryana': 3842, 'Jharkhand': 2773, 'Karnataka': 4830, 'Kerala': 3666, 'Ladakh': 1063, 'Maharashtra': 4912, 'Meghalaya': 1867, 'Madhya Pradesh': 3587, 'Mizoram': 1557, 'Nagaland': 1588, 'Odisha': 4027, 'Punjab': 2950, 'Puducherry': 1832, 'Rajasthan': 4552, 'Sikkim': 1246, 'Tamil Nadu': 4063, 'Tripura': 1564, 'Uttarakhand': 3045, 'Uttar Pradesh': 4557, 'West Bengal': 4196, 'Jammu and Kashmir': 2292, 'Manipur': 1632}
