In [None]:
!pip install pyspark



In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [None]:
df = spark.read.csv('/content/part-00000', header=True)

In [None]:
orders_rdd=spark.sparkContext.textFile('/content/part-00000')

In [None]:
orders_rdd.take(7)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE']

In [None]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [None]:
mapped_rdd=orders_rdd.map(lambda x:(x.split(",")[3],1))

In [None]:
mapped_rdd.take(10)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('PROCESSING', 1),
 ('PENDING_PAYMENT', 1),
 ('PENDING_PAYMENT', 1)]

In [None]:
reduced_rdd=mapped_rdd.reduceByKey(lambda x,y :x+y)

In [None]:
reduced_rdd.collect()
#count the no of order in each status

[('CLOSED', 7556),
 ('PENDING_PAYMENT', 15030),
 ('COMPLETE', 22899),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('CANCELED', 1428),
 ('SUSPECTED_FRAUD', 1558)]

In [None]:
#sorting the no of order can be done by excel also on spark
xy=reduced_rdd.sortBy(lambda x: x[0])
xy.collect()


[('CANCELED', 1428),
 ('CLOSED', 7556),
 ('COMPLETE', 22899),
 ('ON_HOLD', 3798),
 ('PAYMENT_REVIEW', 729),
 ('PENDING', 7610),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('SUSPECTED_FRAUD', 1558)]

In [None]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [None]:
customers_mapped=orders_rdd.map(lambda x:(x.split(",")[2],1))

In [None]:
customers_mapped.take(5)

[('11599', 1), ('256', 1), ('12111', 1), ('8827', 1), ('11318', 1)]

In [None]:
customers_aggregated=customers_mapped.reduceByKey(lambda x,y:(x+y))

In [None]:
customers_aggregated.sortBy(lambda x:(x[1]),False).take(10)

[('6316', 16),
 ('12431', 16),
 ('5897', 16),
 ('569', 16),
 ('4320', 15),
 ('5283', 15),
 ('12284', 15),
 ('5654', 15),
 ('221', 15),
 ('5624', 15)]

In [None]:
dintinct_customers=orders_rdd.map(lambda x: x.split(",")[2]).distinct()

In [None]:
dintinct_customers.count()

12405

In [None]:
filtered_orders=orders_rdd.filter(lambda x: (x.split(",")[3] == 'CLOSED'))

In [None]:
filtered_orders.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED']

In [None]:
filtered_mapped=filtered_orders.map(lambda x:(x.split(",")[2],1))

In [None]:
filtered_mapped.take(5)

[('11599', 1), ('8827', 1), ('1837', 1), ('1205', 1), ('11441', 1)]

In [None]:
filtered_aggregated=filtered_mapped.reduceByKey(lambda x ,y :(x+y))

In [None]:
filtered_aggregated.sortBy(lambda x:x[1],False).take(10)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('9740', 4)]

In [None]:
#develope a logic to find the frequency of each word

In [None]:
words=("big","DAta","is","SUPER","INTERESTING","BIG","DATA","IS","a","TRENDING","TECHNOLOGY")

In [None]:
words_rdd=spark.sparkContext.parallelize(words)
#whatever there is local it will create a RDD

In [None]:
 words_normalized = words_rdd.map(lambda x :x.lower())

In [None]:
words_normalized.collect()

['big',
 'data',
 'is',
 'super',
 'interesting',
 'big',
 'data',
 'is',
 'a',
 'trending',
 'technology']

In [None]:
mapped_words=words_normalized.map(lambda x : (x,1))

In [None]:
mapped_words.take(10)

[('big', 1),
 ('data', 1),
 ('is', 1),
 ('super', 1),
 ('interesting', 1),
 ('big', 1),
 ('data', 1),
 ('is', 1),
 ('a', 1),
 ('trending', 1)]

In [None]:
mapped_aggregated=mapped_words.reduceByKey(lambda x,y:x+y)

In [None]:
mapped_aggregated.take(10)

[('big', 2),
 ('data', 2),
 ('is', 2),
 ('super', 1),
 ('interesting', 1),
 ('a', 1),
 ('trending', 1),
 ('technology', 1)]

In [None]:
#we can also chain all the transformation
result=spark.sparkContext.parallelize(words)\
.map(lambda x :x.lower())\
.map(lambda x : (x,1))\
.reduceByKey(lambda x,y:x+y)


In [None]:
result.collect()

[('big', 2),
 ('data', 2),
 ('is', 2),
 ('super', 1),
 ('interesting', 1),
 ('a', 1),
 ('trending', 1),
 ('technology', 1)]

In [None]:
#Parallelize

In [None]:
words_rdd.getNumPartitions()

1

In [None]:
spark.sparkContext.defaultParallelism

1

In [None]:
orders_rdd=spark.sparkContext.textFile('/content/part-00000')

In [8]:
spark.sparkContext.defaultMinPartitions

1

In [None]:
#spark Dag visualization

In [4]:
my_list=[1,4,6,8,9,12]

In [5]:
base_rdd = spark.sparkContext.parallelize(my_list)

In [6]:
base_rdd.reduce(lambda x,y:x+y)

40