In [1]:
# loading SparkContext and SparkConf from the library pyspark
# Setting the application name and cluster mode using SparkConf

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("YARN").setAppName("RDD Operations")
sc = SparkContext().getOrCreate(conf=conf)

In [None]:
# reading a text file into an RDD
# transformation

rdd = sc.textFile("/common_folder/pyspark_data/blogtext/blogtexts.txt")

In [None]:
# collect: loads the complete data in the driver program
# action

rdd.collect()

In [None]:
# take: returns fixed numner of elements of he RDD
# action

rdd.take(5)

In [None]:
# defining a function (same as python)

def lowerSplit(lines):
      lines = lines.lower()
      lines = lines.split()
      return lines

In [None]:
# map:implementing the function on each element in the RDD

rdd_1 = rdd.map(lowerSplit)

In [None]:
# action

rdd_1.take(5)

In [None]:
# flatMap: implements a function on each element where the output of each element may not be a single element
# better than map if you want the each element separately 

rdd_2 = rdd.flatMap(lowerSplit)

In [None]:
rdd_2.take(5)

In [None]:
# removing a list of frequent appearing words from the rdd
# filter: filters the elements in the RDD based in the condition provided
# filter is a transformation

stopwords = ['is','am','are','the','for','a', "-", '=', '1',"of", "it",'–',"to","in","and","we","can","on","you","how","/"]
rdd_3 = rdd_2.filter(lambda x: x not in stopwords)
rdd_3.take(10)

In [None]:
# We have to pass a function inside the “groupBy” which will take the first 3 characters of each word in “rdd_3”.
# The key is the first 3 characters and value is all the words which start with these 3 characters

rdd_4 = rdd_3.groupBy(lambda w: w[0:3])
print([(k, list(v)) for (k, v) in rdd_4.take(2)])

In [None]:
# aggregating the count of similar words together

rdd_3_mapped = rdd_3.map(lambda x: (x,1))
rdd_3_grouped = rdd_3_mapped.groupByKey()

# ('key', [value1, value2, ...])

In [None]:
# grouping all the occurences as 1

print(list((j[0], list(j[1])) for j in rdd_3_grouped.take(5)))

In [None]:
# aggregating occurences of each word and sorting it based on the count

rdd_3_freq_of_words = rdd_3_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey()

In [None]:
# action

rdd_3_freq_of_words.take(10)

In [None]:
# aggregating occurences of each word using reduceby (instead of groupby) and sorting it in descending order based on the count

rdd_3_mapped.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(10)

In [None]:
# counting the words in each partition separately

def func(iterator):
    count_spark = 0
    count_apache = 0
    for i in iterator:
        if i =='spark':
            count_spark = count_spark + 1
        if i == 'apache':
            count_apache = count_apache + 1
    return (count_spark,count_apache)

In [None]:
# mapPartitions: mapping the defined function on each partiotion individually
# glom(): returns an RDD created by coalescing all elements within each partition into a list

rdd_3.mapPartitions(func).glom().collect()

In [None]:
# generating samples from the data

rdd_3_sampled = rdd_3.sample(False, 0.4, 42)
print(len(rdd_3.collect()),len(rdd_3_sampled.collect()))

In [None]:
# taking union of two datasets

sample_1 = rdd_3.sample(False,0.2,42)
sample_2 =rdd_3.sample(False,0.2,42)
union_of_sample1_sample2 = sample_1.union(sample_2)
print(len(sample_1.collect()), len(sample_2.collect()),len(union_of_sample1_sample2.collect()))

In [None]:
# joiing two RDDs based on the key

sample_1 = rdd_3_mapped.sample(False,.2,42)
sample_2 = rdd_3_mapped.sample(False,.2,42)
join_on_sample1_sample2 = sample_1.join(sample_2)
join_on_sample1_sample2.take(20)

In [None]:
# distict elements

rdd_3_distinct = rdd_3.distinct()
len(rdd_3_distinct.collect())

In [None]:
# getting the number of partitions in the rdd

rdd_3.getNumPartitions()

In [None]:
# reducing the number of partitions in the rdd

rdd_3_coalesce = rdd_3.coalesce(3)

In [None]:
# reducing the number of partitions in the rdd using repartition
# repartition can be used to increase the partitions as well

rdd_3_coalesce=rdd_3.repartition(4)

In [None]:
# getting the number of partitions in the rdd

rdd_3_coalesce.getNumPartitions()

In [None]:
# using range command to create rdd with parallelize 

num_rdd = sc.parallelize(range(1,1000))
num_rdd.reduce(lambda x,y: x+y)

In [None]:
b = range(1,1000)
c = lambda x,y: x+y
print (  c )

In [None]:
num_rdd

In [None]:
# count of elements in RDD

rdd_3.count()

In [None]:
# actions: mathematical operations performed over the RDD

num_rdd.max(),num_rdd.min(), num_rdd.sum(),num_rdd.variance(),num_rdd.stdev()

In [41]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark DataFrame and Sql").getOrCreate()

In [42]:
df = spark.read.csv("/common_folder/airlines/data_2004-08.csv",header=True)

In [43]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (

In [44]:
df.createOrReplaceTempView("dfTable")

In [53]:
spark.sql('SELECT Dest,avg(DepDelay)  FROM dfTable group by Dest order by avg(ArrDelay) desc').show()

+----+-----------------------------+
|Dest|avg(CAST(DepDelay AS DOUBLE))|
+----+-----------------------------+
| BIL|                         80.2|
| JAC|           37.833333333333336|
| GRR|           31.457142857142856|
| DAY|            28.16867469879518|
| GUC|                         34.3|
| CVG|           23.071428571428573|
| DSM|            26.61842105263158|
| GEG|           23.790697674418606|
| TUS|            24.46969696969697|
| PSP|            12.26923076923077|
| ORD|           16.441696823482108|
| BOI|           21.566265060240966|
| GSO|                         26.0|
| SLC|           21.431556948798328|
| ATL|           20.791808873720136|
| BTV|           22.102941176470587|
| ICT|           17.614285714285714|
| OGG|            25.82608695652174|
| SMF|           17.330801104972377|
| IAH|            16.72151898734177|
+----+-----------------------------+
only showing top 20 rows



In [36]:
df.head()

Row(_c0='1', Year='2006', Month='1', DayofMonth='11', DayOfWeek='3', DepTime='743', CRSDepTime='745', ArrTime='1024', CRSArrTime='1018', UniqueCarrier='US', FlightNum='343', TailNum='N657AW', ActualElapsedTime='281', CRSElapsedTime='273', AirTime='223', ArrDelay='6', DepDelay='-2', Origin='ATL', Dest='PHX', Distance='1587', TaxiIn='45', TaxiOut='13', Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='0', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0')

In [37]:
df.count()

100000

In [38]:
df.describe()

DataFrame[summary: string, _c0: string, Year: string, Month: string, DayofMonth: string, DayOfWeek: string, DepTime: string, CRSDepTime: string, ArrTime: string, CRSArrTime: string, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: string, CancellationCode: string, Diverted: string, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

In [47]:
df.head(5)

[Row(_c0='1', Year='2006', Month='1', DayofMonth='11', DayOfWeek='3', DepTime='743', CRSDepTime='745', ArrTime='1024', CRSArrTime='1018', UniqueCarrier='US', FlightNum='343', TailNum='N657AW', ActualElapsedTime='281', CRSElapsedTime='273', AirTime='223', ArrDelay='6', DepDelay='-2', Origin='ATL', Dest='PHX', Distance='1587', TaxiIn='45', TaxiOut='13', Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='0', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0'),
 Row(_c0='2', Year='2006', Month='1', DayofMonth='11', DayOfWeek='3', DepTime='1053', CRSDepTime='1053', ArrTime='1313', CRSArrTime='1318', UniqueCarrier='US', FlightNum='613', TailNum='N834AW', ActualElapsedTime='260', CRSElapsedTime='265', AirTime='214', ArrDelay='-5', DepDelay='0', Origin='ATL', Dest='PHX', Distance='1587', TaxiIn='27', TaxiOut='19', Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='0', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0'),
