# Spark Basic
Basic Spark CRUD operations
RDD & Dataframe

## Resilient Distributed Data RDD

In [None]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.functions import col, max as max_
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import subprocess
import datetime

In [None]:
conf = SparkConf().setAppName('spark-basic')
sc=SparkContext(conf=conf)


In [None]:
# numSlices / number of partititions for the rdd is 2.
num_rdd = sc.parallelize([1,2,3,4,5], 2)

In [None]:
type(num_rdd)

In [None]:
num_rdd.first()

In [None]:
# mapper

num_rdd.map(lambda x: x*x).collect()

In [None]:
# mapper & reducer

num_rdd.map(lambda x: x*x).reduce(lambda a, b: a+b)

In [None]:
# numSlices = 3
num_rdd = sc.parallelize([1,2,3,4,5,6], 3)

In [None]:
# filter
num_rdd.filter(lambda x: x % 2 == 0).collect()

In [None]:
list_rdd = sc.parallelize(['Mathematics', 'Science', 'Mathematics', 'History', 'Biology', 'Science'], 2)

In [None]:
# distinct
list_rdd.distinct().collect()

In [None]:
set_1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
set_2 = sc.parallelize([6,7,8,9,10,11,12,13,14,15,16,17,18,19,20])

In [None]:
# intersection
set_1.intersection(set_2).collect()

In [None]:
x = sc.parallelize([1,2,4,9,16,25])
y = sc.parallelize([1,3,5,7,9,11,13])


In [None]:
# union
x.union(y).collect()

In [None]:
# distinct & union
x.union(y).distinct().collect()

In [None]:
# map & zip
a = list_rdd.map(lambda x: len(x))
list_rdd.zip(a).collect()

In [None]:
# flatMap
num_rdd = sc.parallelize([1,2,3,4,5,6], 3)
num_rdd.flatMap(lambda x: range(1, x+1)).collect()

In [None]:
num_rdd = sc.parallelize([10, 15, 20], 2)
num_rdd.flatMap(lambda x: [x, x, x]).collect()

In [None]:
# keys
a = list_rdd.map(lambda x: (len(x), x))
a.keys().collect()

In [None]:
# cartesian
x = sc.parallelize([1,2,4])
y = sc.parallelize([7,9,11])
x.cartesian(y).collect()

In [None]:
# groupBy
a = list_rdd.groupBy(lambda x: len(x)).collect()
for (x, y) in a:
    print (x)
    for i in y:
        print (i)

In [None]:
sorted([(x,sorted(y)) for(x,y) in a])

In [None]:
# keyBy
a = sc.parallelize(['blue', 'green', 'orange'])
b = sc.parallelize(['black', 'white', 'grey'])
c = a.keyBy(lambda x: len(x))
c.collect()

In [None]:
d = b.keyBy(lambda x: len(x))
d.collect()

In [None]:
c.join(d).collect()

In [None]:
# leftOuterJoin -- include the left of the operation, this case is c object
c.leftOuterJoin(d).collect()

In [None]:
# rightOuterJoin -- include the right of the operation, this case is d object
c.rightOuterJoin(d).collect()

In [None]:
# fullOuterJoin -- include both left & right of the operation, this case c & d objects
c.fullOuterJoin(d).collect()

In [None]:
# reduceByKey
f = a.union(b)
f.collect()

In [None]:
g = f.map(lambda x: (len(x), x))
g.collect()

In [None]:
h = g.reduceByKey(lambda x, y: x + '/' + y)
h.collect()

In [None]:
h.count()

In [None]:
h.take(2)

In [None]:
h.first()

In [None]:
# takeSample (withReplacement, num, [seed])
num_rdd = sc.parallelize([10, 4, 5, 3, 11, 2, 6])

In [None]:
num_rdd.takeSample(False, 3)

In [None]:
num_rdd.takeSample(True, 3)

## DataFrame DF
Column-oriented data organization --> make things easier to understand

do this for the following exercises:

```
hdfs dfs -mkdir /tmp
hdfs dfs -copyFromLocal authors.json /tmp
hdfs dfs -copyFromLocal authors_missing.json /tmp
```



In [None]:
# must create sparksession from sc to avoid this error: 
# 'PipelinedRDD' object has no attribute 'toDF' in PySpark

ss = SparkSession(sc)

In [None]:
colors = ['yellow', 'black', 'white', 'blue', 'green', 'brown', 'pink']

In [None]:
color_df = sc.parallelize(colors).map(lambda x: (x, len(x))).toDF(['color', 'length'])

In [None]:
color_df.show()

## Load data from json -- remember to do this hdfs dfs -copyFromLocal authors.json /tmp/authors.json

sqlContext = SQLContext(sc)
df = sqlContext.read.json('/tmp/authors.json')
df.show()

## DataFrame Operations

In [None]:
colors

In [None]:
color_df

In [None]:
color_df.dtypes

In [None]:
color_df.count()

In [None]:
color_df.show()

In [None]:
color_df.columns

In [None]:
color_df.drop('length').show()

In [None]:
color_df.toJSON().first()

In [None]:
color_df.filter(color_df.length.between(4,5)).select(color_df.color.alias('mid_length')).show()

In [None]:
color_df.filter(color_df.length > 4).filter(color_df[0] != 'white').show()

In [None]:
color_df.sort('color').show()

In [None]:
color_df.filter(color_df['length'] > 4).sort('length', 'color', ascending=False).show()

In [None]:
color_df.orderBy('length', 'color').take(4)

In [None]:
color_df.sort(color_df.length.desc(), color_df.color.asc()).show()

In [None]:
color_df.groupBy('length').count().show()

In [None]:
## Load file
df1 = ss.read.json('/tmp/authors_missing.json')
df1.show()

In [None]:
df2 = df1.dropna()
df2.show()

## Example from Airline Performance data

Need to prepare airline traffic data by running the following command at `cisc-525-util` directory

```
cd ~/cisc525/cisc-525-util
./prepare-hadoop-data.bash
```

In [None]:
spark = SparkSession.builder.appName("performance-app").config("spark.config.option", "value").getOrCreate()

In [None]:
schema = StructType([\
                     StructField('Year', IntegerType(), True),\
                     StructField('Month', IntegerType(), True),\
                     StructField('DayOfMonth', IntegerType(), True),\
                     StructField('DayOfWeek', IntegerType(), True),\
                     StructField('DepTime', IntegerType(), True),\
                     StructField('CRSDepTime', IntegerType(), True),\
                     StructField('ArrTime', IntegerType(), True),\
                     StructField('CRSArrTime', IntegerType(), True),\
                     StructField('UniqueCarrier', StringType(), True),\
                     StructField('FlightNum', IntegerType(), True),\
                     StructField('TailNum', StringType(), True),\
                     StructField('ActualElapsedTime', StringType(), True),\
                     StructField('CRSElapsedTime', StringType(), True),\
                     StructField('AirTime', StringType(), True),\
                     StructField('ArrDelay', StringType(), True),\
                     StructField('DepDelay', IntegerType(), True),\
                     StructField('Origin', StringType(), True),\
                     StructField('Dest', StringType(), True),\
                     StructField('Distance', StringType(), True),\
                     StructField('TaxiIn', StringType(), True),\
                    ])

In [None]:
year = 'hdfs://localhost:9000/user/student/airline/1987.csv'

In [None]:
df = spark.read.format('csv').option('header', 'true').schema(schema).load(year)

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df.groupBy('uniquecarrier').count().show()

In [None]:
df.groupBy('origin').count().show()

In [None]:
df.groupBy('dest').count().show()

In [None]:
# searching for delay that is greater than 40.
df.filter(df['depdelay'] > 40).groupBy('dest').count().orderBy(desc('count')).show()

In [None]:
df.select('uniquecarrier', 'origin', 'dest').orderBy('origin').show()

In [None]:
df.filter(df['uniquecarrier'] == 'UA').show()

In [None]:
df.groupBy('uniquecarrier').mean('depdelay').show()

In [None]:
df.cache
df.createOrReplaceTempView('flights')
spark.catalog.cacheTable('flights')

In [None]:
spark.sql('select uniquecarrier, origin, dest, depdelay from flights where depdelay > 40 order by depdelay desc limit 5').show()

In [None]:
spark.sql('select uniquecarrier, avg(depdelay) from flights group by uniquecarrier').show()