In [1]:
import pandas as pd

In [2]:
import os
os.environ['JAVA_HOME']="/usr/java/jre1.8.0-x64"

from pyspark import SparkContext
sc=SparkContext.getOrCreate()



In [8]:
sc

In [6]:
file_path = 'salestxns.tsv'

# Define custom column names
column_names = ['Txn Id',
'Category Id',
'Category Name',
'Product Id',
'Product Name', 
'Price',
'Quantity',
'Customer Id']

# Read the CSV file with custom separator and column names
df = pd.read_csv(file_path, sep='\t', names=column_names)

In [9]:
data=sc.parallelize(df, numSlices=2)

In [10]:
data.take(2)

['Txn Id', 'Category Id']

In [11]:
from pyspark.sql import SparkSession

In [13]:
spark=SparkSession.builder.appName("Pandastordd").getOrCreate()

In [14]:
rdd=spark.sparkContext.parallelize(df.values)

In [15]:
rdd.take(5)

[array([1, 43, 'Camping & Hiking', 957,
        "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 11599],
       dtype=object),
 array([2, 48, 'Water Sports', 1073, 'Pelican Sunstream 100 Kayak', 199.99,
        1, 256], dtype=object),
 array([3, 24, "Women's Apparel", 502,
        "Nike Men's Dri-FIT Victory Golf Polo", 50.0, 5, 256], dtype=object),
 array([4, 18, "Men's Footwear", 403,
        "Nike Men's CJ Elite 2 TD Football Cleat", 129.99, 1, 256],
       dtype=object),
 array([5, 40, 'Accessories', 897,
        'Team Golf New England Patriots Putter Grip', 24.99, 2, 8827],
       dtype=object)]

### Load Data as RDD

In [16]:
rdd_c = spark.sparkContext.textFile('salestxns.tsv')

In [17]:
# Split each line by tab to create a list of values
rdd_split = rdd_c.map(lambda line: line.split('\t'))

In [20]:
header = rdd_split.first()
rdd_data = rdd_split.filter(lambda row: row != header)

In [55]:
rdd_c

salestxns.tsv MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

In [22]:
rdd_c.take(20)

["1\t43\tCamping & Hiking\t957\tDiamondback Women's Serene Classic Comfort Bi\t299.98\t1\t11599",
 '2\t48\tWater Sports\t1073\tPelican Sunstream 100 Kayak\t199.99\t1\t256',
 "3\t24\tWomen's Apparel\t502\tNike Men's Dri-FIT Victory Golf Polo\t50\t5\t256",
 "4\t18\tMen's Footwear\t403\tNike Men's CJ Elite 2 TD Football Cleat\t129.99\t1\t256",
 '5\t40\tAccessories\t897\tTeam Golf New England Patriots Putter Grip\t24.99\t2\t8827',
 '6\t17\tCleats\t365\tPerfect Fitness Perfect Rip Deck\t59.99\t5\t8827',
 "7\t24\tWomen's Apparel\t502\tNike Men's Dri-FIT Victory Golf Polo\t50\t3\t8827",
 "8\t46\tIndoor/Outdoor Games\t1014\tO'Brien Men's Neoprene Life Vest\t49.98\t4\t8827",
 "9\t43\tCamping & Hiking\t957\tDiamondback Women's Serene Classic Comfort Bi\t299.98\t1\t11318",
 '10\t17\tCleats\t365\tPerfect Fitness Perfect Rip Deck\t59.99\t5\t11318',
 "11\t46\tIndoor/Outdoor Games\t1014\tO'Brien Men's Neoprene Life Vest\t49.98\t2\t11318",
 "12\t43\tCamping & Hiking\t957\tDiamondback Women's Serene Cl

### Find out the average sales in each category

In [23]:
# Assuming 'Price' is in column index 5 and 'Category Name' is in column index 2
category_sales = rdd_data.map(lambda row: (row[2], float(row[5])))
average_sales_by_category = category_sales.groupByKey().mapValues(lambda values: sum(values) / len(values))

In [25]:
average_sales_by_category.take(5)

[('Water Sports', 200.37610038616728),
 ("Women's Apparel", 50.0),
 ("Men's Footwear", 129.9900000000552),
 ('Accessories', 24.98999999999998),
 ('Cleats', 60.341920492028706)]

### Find out the customer who  maximum & minimum spends

In [75]:
customer_spends = rdd_data.map(lambda row: (row[7], float(row[5])))
max_spend = customer_spends.groupByKey().mapValues(lambda x: sum(x))
min_spend = customer_spends.groupByKey().mapValues(lambda x: sum(x))

In [73]:
max_spend.max(key=lambda x:x[1])

('791', 7736.4599999999955)

In [69]:
#customer_with_max_spend = max_spend.first()
#customer_with_max_spend 

In [70]:
#customer_with_min_spend = min_spend.first()
#customer_with_min_spend

In [74]:
min_spend.min(key=lambda x:x[1])

('7091', 9.99)

### Most selling category/product
### Least selling category/product

In [76]:
category_sales = rdd_data.map(lambda row: (row[2], int(row[6])))
product_sales = rdd_data.map(lambda row: (row[4], int(row[6])))
max_cat = category_sales.groupByKey().mapValues(lambda x: sum(x))
min_cat= category_sales.groupByKey().mapValues(lambda x: sum(x))
max_prod = product_sales.groupByKey().mapValues(lambda x: sum(x))
min_prod= product_sales.groupByKey().mapValues(lambda x: sum(x))

In [77]:
max_cat.max(key=lambda x:x[1])

('Cleats', 73734)

In [79]:
min_cat.min(key=lambda x:x[1])

('Golf Bags & Carts', 61)

In [80]:
max_prod.max(key=lambda x:x[1])

('Perfect Fitness Perfect Rip Deck', 73698)

In [81]:
min_prod.min(key=lambda x:x[1])

('Bowflex SelectTech 1090 Dumbbells', 10)