In [1]:
#RDD

In [2]:
from pyspark import SparkContext

In [3]:
sc=SparkContext()

In [7]:
raw = sc.textFile('/content/data.csv')


In [11]:
# Remove header safely
header = raw.first()
rdd = raw.filter(lambda line: line != header)

In [12]:
# If your CSV has commas inside quotes, prefer a robust split:
# Simple split (fast, assumes no quoted commas):
split_rdd = rdd.map(lambda line: [c.strip() for c in line.split(",")])

In [13]:
# Column index map based on your schema:
# 0: InvoiceNo, 1: StockCode, 2: Description, 3: Quantity,
# 4: InvoiceDate, 5: UnitPrice, 6: CustomerID, 7: Country

In [14]:
def to_typed(row):
    # Guard against short or bad rows
    if len(row) < 8:
        return None
    try:
        return (
            row[0],                 # InvoiceNo (str)
            row[1],                 # StockCode (str)
            row[2],                 # Description (str)
            int(row[3]),            # Quantity (int)
            row[4],                 # InvoiceDate (str)
            float(row[5]),          # UnitPrice (float/decimal)
            int(row[6]),            # CustomerID (int)
            row[7]                  # Country (str)
        )
    except Exception:
        return None

typed_rdd = split_rdd.map(to_typed).filter(lambda x: x is not None)
typed_rdd.cache()  # reuse across queries
typed_rdd.take(3)


[('536365',
  '85123A',
  'WHITE HANGING HEART T-LIGHT HOLDER',
  6,
  '12/1/2010 8:26',
  2.55,
  17850,
  'United Kingdom'),
 ('536365',
  '71053',
  'WHITE METAL LANTERN',
  6,
  '12/1/2010 8:26',
  3.39,
  17850,
  'United Kingdom'),
 ('536365',
  '84406B',
  'CREAM CUPID HEARTS COAT HANGER',
  8,
  '12/1/2010 8:26',
  2.75,
  17850,
  'United Kingdom')]

In [20]:
#TOTAL COUNT :

typed_rdd.count()


342278

In [21]:
#Sample schema check

sample = typed_rdd.first()
print("InvoiceNo:", sample[0], "Description:", sample[2], "Quantity:", sample[3], "UnitPrice:", sample[5], "CustomerID:", sample[6])


InvoiceNo: 536365 Description: WHITE HANGING HEART T-LIGHT HOLDER Quantity: 6 UnitPrice: 2.55 CustomerID: 17850


In [22]:
typed_rdd.getNumPartitions()

2

In [15]:
#2. Display the unit price of each customer:


customer_unit_price = typed_rdd.map(lambda x: (x[6], x[5]))  # (CustomerID, UnitPrice)
customer_unit_price.take(10)


[(17850, 2.55),
 (17850, 3.39),
 (17850, 2.75),
 (17850, 3.39),
 (17850, 3.39),
 (17850, 7.65),
 (17850, 4.25),
 (17850, 1.85),
 (17850, 1.85),
 (13047, 1.69)]

In [23]:
#saving this into file :

customer_unit_price.coalesce(1).saveAsTextFile("/content/output_q2_single")


In [24]:
#Load back into Python (optional)
# If you want to read the saved file back into Python for further use:

with open("/content/output_q2_single/part-00000", "r") as f:
    lines = f.readlines()

print(lines[:10])  # show first 10 records


['(17850, 2.55)\n', '(17850, 3.39)\n', '(17850, 2.75)\n', '(17850, 3.39)\n', '(17850, 3.39)\n', '(17850, 7.65)\n', '(17850, 4.25)\n', '(17850, 1.85)\n', '(17850, 1.85)\n', '(13047, 1.69)\n']


In [16]:
# Average unit price per customer
sum_count = customer_unit_price.mapValues(lambda p: (p, 1)) \
                               .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
avg_unit_price = sum_count.mapValues(lambda sc: sc[0]/sc[1])
avg_unit_price.take(10)


[(17850, 3.9603703703703643),
 (13748, 3.9964285714285714),
 (15100, 10.95),
 (14688, 2.2630543933054392),
 (16098, 4.189166666666666),
 (18074, 4.7807692307692315),
 (17420, 3.8813333333333326),
 (16250, 3.376428571428572),
 (17548, 1.974705882352941),
 (13408, 3.4329729729729723)]

In [25]:
#saving this into file :

avg_unit_price.coalesce(1).saveAsTextFile("/content/output_q2_avg")


In [26]:
#Load back into Python (optional)
# If you want to read the saved file back into Python for further use:

with open("/content/output_q2_avg/part-00000", "r") as f:
    lines = f.readlines()

print(lines[:10])  # show first 10 records


['(17850, 3.9603703703703643)\n', '(13748, 3.9964285714285714)\n', '(15100, 10.95)\n', '(14688, 2.2630543933054392)\n', '(16098, 4.189166666666666)\n', '(18074, 4.7807692307692315)\n', '(17420, 3.8813333333333326)\n', '(16250, 3.376428571428572)\n', '(17548, 1.974705882352941)\n', '(13408, 3.4329729729729723)\n']


In [17]:
#9. Display the data where Description contains “white”:

white_rows = typed_rdd.filter(lambda x: x[2] and "white" in x[2].lower())
white_rows.take(10)


[('536365',
  '85123A',
  'WHITE HANGING HEART T-LIGHT HOLDER',
  6,
  '12/1/2010 8:26',
  2.55,
  17850,
  'United Kingdom'),
 ('536365',
  '71053',
  'WHITE METAL LANTERN',
  6,
  '12/1/2010 8:26',
  3.39,
  17850,
  'United Kingdom'),
 ('536365',
  '84029E',
  'RED WOOLLY HOTTIE WHITE HEART.',
  6,
  '12/1/2010 8:26',
  3.39,
  17850,
  'United Kingdom'),
 ('536373',
  '85123A',
  'WHITE HANGING HEART T-LIGHT HOLDER',
  6,
  '12/1/2010 9:02',
  2.55,
  17850,
  'United Kingdom'),
 ('536373',
  '71053',
  'WHITE METAL LANTERN',
  6,
  '12/1/2010 9:02',
  3.39,
  17850,
  'United Kingdom'),
 ('536373',
  '82483',
  'WOOD 2 DRAWER CABINET WHITE FINISH',
  2,
  '12/1/2010 9:02',
  4.95,
  17850,
  'United Kingdom'),
 ('536373',
  '82486',
  'WOOD S/3 CABINET ANT WHITE FINISH',
  4,
  '12/1/2010 9:02',
  6.95,
  17850,
  'United Kingdom'),
 ('536373',
  '82482',
  'WOODEN PICTURE FRAME WHITE FINISH',
  6,
  '12/1/2010 9:02',
  2.1,
  17850,
  'United Kingdom'),
 ('536373',
  '82494L',
  

In [27]:
#saving this into file :

white_rows.coalesce(1).saveAsTextFile("/content/output_q9_single")

In [28]:
#Load back into Python (optional)
# If you want to read the saved file back into Python for further use:

with open("/content/output_q9_single/part-00000", "r") as f:
    lines = f.readlines()

print(lines[:10])  # show first 10 records

["('536365', '85123A', 'WHITE HANGING HEART T-LIGHT HOLDER', 6, '12/1/2010 8:26', 2.55, 17850, 'United Kingdom')\n", "('536365', '71053', 'WHITE METAL LANTERN', 6, '12/1/2010 8:26', 3.39, 17850, 'United Kingdom')\n", "('536365', '84029E', 'RED WOOLLY HOTTIE WHITE HEART.', 6, '12/1/2010 8:26', 3.39, 17850, 'United Kingdom')\n", "('536373', '85123A', 'WHITE HANGING HEART T-LIGHT HOLDER', 6, '12/1/2010 9:02', 2.55, 17850, 'United Kingdom')\n", "('536373', '71053', 'WHITE METAL LANTERN', 6, '12/1/2010 9:02', 3.39, 17850, 'United Kingdom')\n", "('536373', '82483', 'WOOD 2 DRAWER CABINET WHITE FINISH', 2, '12/1/2010 9:02', 4.95, 17850, 'United Kingdom')\n", "('536373', '82486', 'WOOD S/3 CABINET ANT WHITE FINISH', 4, '12/1/2010 9:02', 6.95, 17850, 'United Kingdom')\n", "('536373', '82482', 'WOODEN PICTURE FRAME WHITE FINISH', 6, '12/1/2010 9:02', 2.1, 17850, 'United Kingdom')\n", "('536373', '82494L', 'WOODEN FRAME ANTIQUE WHITE', 6, '12/1/2010 9:02', 2.55, 17850, 'United Kingdom')\n", "('53

In [19]:
#12. Find which CustomerID has description related to “water bottle”:


water_bottle_customers = typed_rdd.filter(lambda x: x[2] and "water bottle" in x[2].lower()) \
                                  .map(lambda x: x[6]) \
                                  .distinct()
#water_bottle_customers.collect()
water_bottle_customers.take(10)




[17850, 17548, 13408, 13448, 15862, 17908, 17920, 12838, 13758, 12868]

In [29]:
#saving this into file :

water_bottle_customers.coalesce(1).saveAsTextFile("/content/output_q12_single")

In [30]:
#Load back into Python (optional)
# If you want to read the saved file back into Python for further use:

with open("/content/output_q12_single/part-00000", "r") as f:
    lines = f.readlines()

print(lines[:10])  # show first 10 records

['17850\n', '17548\n', '13408\n', '13448\n', '15862\n', '17908\n', '17920\n', '12838\n', '13758\n', '12868\n']


In [31]:
#----------------------- CONVERTING INTO CSV FILE -------------------

In [39]:
#output_q2

#Step 1: Clean and Parse the Output:

import ast

# Read and clean the saved part file
with open("/content/output_q2_single/part-00000", "r") as f:
    lines = [line.strip() for line in f]

# Convert string tuples into Python tuples
parsed = [ast.literal_eval(line) for line in lines]
print(parsed[:10])  # sample preview


[(17850, 2.55), (17850, 3.39), (17850, 2.75), (17850, 3.39), (17850, 3.39), (17850, 7.65), (17850, 4.25), (17850, 1.85), (17850, 1.85), (13047, 1.69)]


In [35]:
#Step 2: Convert to Pandas DataFrame :

import pandas as pd

# Create DataFrame with proper column names
df_q2 = pd.DataFrame(parsed, columns=["CustomerID", "UnitPrice"])

# Preview
df_q2.head()


Unnamed: 0,CustomerID,UnitPrice
0,17850,2.55
1,17850,3.39
2,17850,2.75
3,17850,3.39
4,17850,3.39


In [36]:
#Step 3: Export to CSV:

# Save as CSV file
df_q2.to_csv("/content/output_q2.csv", index=False)

# # Verify file exists
!ls -lh /content/output_q2.csv


-rw-r--r-- 1 root root 3.6M Jan  3 13:21 /content/output_q2.csv


In [38]:
#Step 4: Download the CSV in Colab:
#This will trigger a download of output_q2.csv to your local machine.
#You can then open it directly in Excel or Power BI.

from google.colab import files
files.download("/content/output_q2.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [40]:
#----------------

In [41]:
#output_q2_avg

#Step 1: Clean and Parse the Output:

import ast

# Read and clean the saved part file
with open("/content/output_q2_avg/part-00000", "r") as f:
    lines = [line.strip() for line in f]

# Convert string tuples into Python tuples
parsed = [ast.literal_eval(line) for line in lines]
print(parsed[:10])  # sample preview


[(17850, 3.9603703703703643), (13748, 3.9964285714285714), (15100, 10.95), (14688, 2.2630543933054392), (16098, 4.189166666666666), (18074, 4.7807692307692315), (17420, 3.8813333333333326), (16250, 3.376428571428572), (17548, 1.974705882352941), (13408, 3.4329729729729723)]


In [42]:
#Step 2: Convert to Pandas DataFrame :

import pandas as pd

# Create DataFrame with proper column names
df_q2_avg = pd.DataFrame(parsed, columns=["CustomerID", "AvgUnitPrice"])

# Preview
df_q2_avg.head()

Unnamed: 0,CustomerID,AvgUnitPrice
0,17850,3.96037
1,13748,3.996429
2,15100,10.95
3,14688,2.263054
4,16098,4.189167


In [43]:
#Step 3: Export to CSV:

# Save as CSV file
df_q2_avg.to_csv("/content/output_q2_avg.csv", index=False)

# # Verify file exists
!ls -lh /content/output_q2_avg.csv

-rw-r--r-- 1 root root 92K Jan  3 13:29 /content/output_q2_avg.csv


In [44]:
#Step 4: Download the CSV in Colab:
#This will trigger a download of output_q2_avg.csv to your local machine.
#You can then open it directly in Excel or Power BI.

from google.colab import files
files.download("/content/output_q2_avg.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [45]:
#----------------

In [46]:
#output_q9

#Step 1: Clean and Parse the Output:

import ast

# Read and clean the saved part file
with open("/content/output_q9_single/part-00000", "r") as f:
    lines = [line.strip() for line in f]

# Convert string tuples into Python tuples
parsed = [ast.literal_eval(line) for line in lines]
print(parsed[:10])  # sample preview

[('536365', '85123A', 'WHITE HANGING HEART T-LIGHT HOLDER', 6, '12/1/2010 8:26', 2.55, 17850, 'United Kingdom'), ('536365', '71053', 'WHITE METAL LANTERN', 6, '12/1/2010 8:26', 3.39, 17850, 'United Kingdom'), ('536365', '84029E', 'RED WOOLLY HOTTIE WHITE HEART.', 6, '12/1/2010 8:26', 3.39, 17850, 'United Kingdom'), ('536373', '85123A', 'WHITE HANGING HEART T-LIGHT HOLDER', 6, '12/1/2010 9:02', 2.55, 17850, 'United Kingdom'), ('536373', '71053', 'WHITE METAL LANTERN', 6, '12/1/2010 9:02', 3.39, 17850, 'United Kingdom'), ('536373', '82483', 'WOOD 2 DRAWER CABINET WHITE FINISH', 2, '12/1/2010 9:02', 4.95, 17850, 'United Kingdom'), ('536373', '82486', 'WOOD S/3 CABINET ANT WHITE FINISH', 4, '12/1/2010 9:02', 6.95, 17850, 'United Kingdom'), ('536373', '82482', 'WOODEN PICTURE FRAME WHITE FINISH', 6, '12/1/2010 9:02', 2.1, 17850, 'United Kingdom'), ('536373', '82494L', 'WOODEN FRAME ANTIQUE WHITE', 6, '12/1/2010 9:02', 2.55, 17850, 'United Kingdom'), ('536373', '84029E', 'RED WOOLLY HOTTIE W

In [47]:
#Step 2: Convert to Pandas DataFrame :

import pandas as pd

# Create DataFrame with proper column names
df_q9 = pd.DataFrame(parsed, columns=["InvoiceNO", "StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID", "Country"])

# Preview
df_q9.head()

Unnamed: 0,InvoiceNO,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
2,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom
3,536373,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 9:02,2.55,17850,United Kingdom
4,536373,71053,WHITE METAL LANTERN,6,12/1/2010 9:02,3.39,17850,United Kingdom


In [48]:
#Step 3: Export to CSV:

# Save as CSV file
df_q9.to_csv("/content/output_q9.csv", index=False)

# # Verify file exists
!ls -lh /content/output_q9.csv

-rw-r--r-- 1 root root 1.2M Jan  3 13:34 /content/output_q9.csv


In [49]:
#Step 4: Download the CSV in Colab:
#This will trigger a download of output_q2_avg.csv to your local machine.
#You can then open it directly in Excel or Power BI.

from google.colab import files
files.download("/content/output_q9.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [50]:
#----------------

In [52]:
#output_q12

#Step 1: Clean and Parse the Output:

import ast

# Read and clean the saved part file
with open("/content/output_q12_single/part-00000", "r") as f:
    lines = [line.strip() for line in f]

# Convert string tuples into Python tuples
parsed = [ast.literal_eval(line) for line in lines]
print(parsed[:10])  # sample preview

[17850, 17548, 13408, 13448, 15862, 17908, 17920, 12838, 13758, 12868]


In [53]:
#Step 2: Convert to Pandas DataFrame :

import pandas as pd

# Create DataFrame with proper column names
df_q12 = pd.DataFrame(parsed, columns=["CustomerID"])

# Preview
df_q12.head()

Unnamed: 0,CustomerID
0,17850
1,17548
2,13408
3,13448
4,15862


In [54]:
#Step 3: Export to CSV:

# Save as CSV file
df_q12.to_csv("/content/output_q12.csv", index=False)

# # Verify file exists
!ls -lh /content/output_q12.csv

-rw-r--r-- 1 root root 7.6K Jan  3 13:38 /content/output_q12.csv


In [55]:
#Step 4: Download the CSV in Colab:
#This will trigger a download of output_q2_avg.csv to your local machine.
#You can then open it directly in Excel or Power BI.

from google.colab import files
files.download("/content/output_q12.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>