In [1]:
#Mark Russeff
#Homework 7
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext('local[*]')

In [2]:
data_file = "./OLX_Car_Data_CSV.csv"
data = sc.textFile(data_file)

data_header = data.first() 
header = sc.parallelize([data_header])
raw_data = data.subtract(header) #extract header

In [3]:
raw_data.count()

24973

In [4]:
raw_data.take(5)

['Toyota,Used,Diesel,1,Prado,2100000,Karachi,Cash,1997',
 'Toyota,Used,Diesel,1,Prado,2100000,Karachi,Cash,1997',
 'Suzuki,Used,Petrol,94000,Alto,535000,Karachi,Cash,2010',
 'Toyota,Used,Petrol,100000,Corolla XLI,1430000,Karachi,Cash,2013',
 'Suzuki,New,CNG,65000,Cultus VXL,450000,Karachi,Cash,2006']

In [5]:
#Creating an RDD using parellelize
a = range(100)
    
data = sc.parallelize(a)

In [6]:
data.count()

100

In [7]:
data.take(5)

[0, 1, 2, 3, 4]

In [17]:
#Filter transformation
#Counting the number of Toyotas
toyota_raw_data = raw_data.filter(lambda x: 'Toyota' in x)

In [18]:
from time import time
t0 = time()
toyota_count = toyota_raw_data.count()
tt = time() - t0

print("There are {} Toyotas".format(toyota_count))
print("Count completed in {} seconds".format(round(tt,3)))

There are 4981 Toyotas
Count completed in 0.399 seconds


In [19]:
#Map transformation
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0

print("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0])

Parse completed in 0.127 seconds
['Toyota', 'Used', 'Diesel', '1', 'Prado', '2100000', 'Karachi', 'Cash', '1997']


In [20]:
#Take a lot of elements for Map transformation
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0

print("Parse completed in {} seconds".format(round(tt,3)))

Parse completed in 1.474 seconds


In [24]:
#Map with predefined function
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[0]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

('Toyota',
 ['Toyota',
  'Used',
  'Diesel',
  '1',
  'Prado',
  '2100000',
  'Karachi',
  'Cash',
  '1997'])


In [25]:
#The collect action
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0

print("Data collected in {} seconds".format(round(tt,3)))

Data collected in 0.524 seconds


In [29]:
#Run all in one...
#Get data from file
data_file = "./OLX_Car_Data_CSV.csv"
data = sc.textFile(data_file)

data_header = data.first() 
header = sc.parallelize([data_header])
raw_data = data.subtract(header)

#Parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

#Filter Toyota key elements
toyota_key = key_csv_data.filter(lambda x: x[0] == "Toyota")

#Collect all
t0 = time()
all_toyota = toyota_key.collect()
tt = time() - t0
toyota_count = len(all_toyota)

print("Data collected in {} seconds".format(round(tt,3)))
print("There are {} 'Toyotas'".format(toyota_count))

Data collected in 1.052 seconds
There are 4981 'Toyotas'


In [30]:
#The sample transformation
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()

print("Sample size is {} of {}".format(sample_size, total_size))

Sample size is 2363 of 24973


In [37]:
# Transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_toyota_tags = raw_data_sample_items.filter(lambda x: "Toyota" in x)

#Actions + time
t0 = time()
sample_toyota_tags_count = sample_toyota_tags.count()
tt = time() - t0

sample_toyota_ratio = sample_toyota_tags_count / float(sample_size)
print("The ratio of 'Toyotas' is {}".format(round(sample_toyota_ratio,3))) 
print("Count done in {} seconds".format(round(tt,3)))

The ratio of 'Toyotas' is 0.201
Count done in 0.775 seconds


In [38]:
#Without Sampling
#Transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
toyota_tags = raw_data_items.filter(lambda x: "Toyota" in x)

#Actions + time
t0 = time()
toyota_tags_count = toyota_tags.count()
tt = time() - t0

toyota_ratio = toyota_tags_count / float(total_size)
print("The ratio of 'Toyotas' is {}".format(round(toyota_ratio,3))) 
print("Count done in {} seconds".format(round(tt,3)))

The ratio of 'Toyotas' is 0.199
Count done in 0.58 seconds


In [40]:
#The takeSample action
t0 = time()
raw_data_sample = raw_data.takeSample(False, 2500, 1234)
toyota_data_sample = [x.split(",") for x in raw_data_sample if "Toyota" in x]
tt = time() - t0

toyota_sample_size = len(toyota_data_sample)

toyota_ratio = toyota_sample_size / 2500.0
print("The ratio of 'Toyotas' is {}".format(toyota_ratio))
print("Count done in {} seconds".format(round(tt,3)))

The ratio of 'Toyotas' is 0.1996
Count done in 0.845 seconds


In [42]:
#Getting New Cars using substract
used_raw_data = raw_data.filter(lambda x: "Used" in x)

In [43]:
new_raw_data = raw_data.subtract(used_raw_data)

In [44]:
#Count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0

print("All count in {} secs".format(round(tt,3)))

All count in 0.392 secs


In [45]:
#Count used cars
t0 = time()
used_raw_data_count = used_raw_data.count()
tt = time() - t0

print("Used car count in {} secs".format(round(tt,3)))

Used car count in 0.545 secs


In [47]:
#Count new cars
t0 = time()
new_raw_data_count = new_raw_data.count()
tt = time() - t0
print("New car count in {} secs".format(round(tt,3)))

New car count in 2.316 secs


In [48]:
print("There are {} used cars and {} new cars, \
from a total of {} cars".format(used_raw_data_count,new_raw_data_count,raw_data_count))

There are 18472 used cars and 6501 new cars, from a total of 24973 cars


In [49]:
#Protocol and Service Combinations Using cartesian
#Get the Makes
csv_data = raw_data.map(lambda x: x.split(","))
makes = csv_data.map(lambda x: x[0]).distinct()
makes.collect()

['Suzuki',
 'Daihatsu',
 'Other Brands',
 'Mitsubishi',
 'KIA',
 'Daewoo',
 'FAW',
 'Classic & Antiques',
 '',
 'Porsche',
 'Range Rover',
 'Land Rover',
 'Toyota',
 'Honda',
 'Nissan',
 'Hyundai',
 'Mercedes',
 'Mazda',
 'BMW',
 'Chevrolet',
 'Audi',
 'Lexus',
 'Changan',
 'Subaru']

In [50]:
fuel = csv_data.map(lambda x: x[2]).distinct()
fuel.collect()

['Diesel', 'CNG', 'LPG', '', 'Petrol', 'Hybrid']

In [51]:
#Get the Cartesian Product
product = makes.cartesian(fuel).collect()
print("There are {} combinations of make X fuel type".format(len(product)))

There are 144 combinations of make X fuel type
