# Initiating PySpark / sparksession
________________________________

In [None]:

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import findspark
findspark.init()
findspark.find()
import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlcontext=SQLContext(sc)

# Theoretical Questions 
***********************************


## Question # 1    

### Narrow vs Wide Transformation
####    1. What are narrow and wide transformations?
####    2. Examples of Wide Transformations. Operations of Wide Transformations
####    3. Apply those operations on an RDD
    
### Ans:
    Transformations transform an existing RDD to a new RDD. Lazy evaluation since it is only executed and 
    evaluated only when an action is performed on it.
    1.Narrow transformations are 1:1 transformations. The child node is only dependent on the parent node.
    Examples of narrow transformations are map(),flatMap(),filter(),union(),sample(),MapPartition(),read(). No data transfer       is     required. Can be computed locally
    2.Wide transformations shuffles elements across various partitions i.e. 1:N.Requires data transfer between nodes.
    Examples of Wide Transformations: sort(),distinct(),join(),intersection(),OrderBy(),reduceBy(),groupBy()
    cartesian(),repartition(),coalesce()
        

## Question # 2

### Difference between Map and FlatMap 
#### Map transformation takes one element from an RDD and produces one element of a new RDD 
#### FlatMap transformation takes one element and produces one, zero or more elements of a new RDD


## Question # 3

### Advantages of spark Dataframes over RDDs?
#### Operations using Dataframes are automatically optimized


## Question # 4

### What is the module called for structured Data processing?
##### SparkSQL



# RDD
************

## Question # 1

### How to read a text file in PySpark? Show both ways using sparkContext into RDD and reading a file into Dataframe?


In [2]:
df=sc.textFile("df.txt")

In [3]:
df

df.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
df1=spark.read.text("df.txt")

In [5]:
df1

DataFrame[value: string]

In [6]:
df2=sqlcontext.read.text("df.txt")

In [7]:
df2

DataFrame[value: string]


## Question # 2

### Splittig an RDD containing string values

In [20]:
df.collect()

['Student\t% achieved in the assignments\tPass exam',
 'Bob\t36%\tNo',
 'Carol\t95%\tYes',
 'Dan\t63%\tYes',
 'Eve\t43%\tNo',
 'Frank\t84%\tYes',
 'Grace\t54%\tYes',
 'Heidi\t15%\tNo',
 'Ivan\t21%\tNo',
 'Judy\t91%\tYes',
 'Mallory\t34%\tNo']

In [21]:
#Removing the first row/header of the RDD
firstRow=df.first()

df=df.filter(lambda row:row!=firstRow)

df.collect()

['Alice\t70%\tYes',
 'Bob\t36%\tNo',
 'Carol\t95%\tYes',
 'Dan\t63%\tYes',
 'Eve\t43%\tNo',
 'Frank\t84%\tYes',
 'Grace\t54%\tYes',
 'Heidi\t15%\tNo',
 'Ivan\t21%\tNo',
 'Judy\t91%\tYes',
 'Mallory\t34%\tNo']

In [22]:
words_map=df.map(lambda x:x.split("\t"))

In [23]:
words_map.collect()

[['Alice', '70%', 'Yes'],
 ['Bob', '36%', 'No'],
 ['Carol', '95%', 'Yes'],
 ['Dan', '63%', 'Yes'],
 ['Eve', '43%', 'No'],
 ['Frank', '84%', 'Yes'],
 ['Grace', '54%', 'Yes'],
 ['Heidi', '15%', 'No'],
 ['Ivan', '21%', 'No'],
 ['Judy', '91%', 'Yes'],
 ['Mallory', '34%', 'No']]

In [24]:
from pyspark.sql import Row


In [25]:
dataMap=words_map.map(lambda wm:Row(student=wm[0],percentage=wm[1],passed=wm[2]))

In [26]:
dataMap.collect()

[Row(student='Alice', percentage='70%', passed='Yes'),
 Row(student='Bob', percentage='36%', passed='No'),
 Row(student='Carol', percentage='95%', passed='Yes'),
 Row(student='Dan', percentage='63%', passed='Yes'),
 Row(student='Eve', percentage='43%', passed='No'),
 Row(student='Frank', percentage='84%', passed='Yes'),
 Row(student='Grace', percentage='54%', passed='Yes'),
 Row(student='Heidi', percentage='15%', passed='No'),
 Row(student='Ivan', percentage='21%', passed='No'),
 Row(student='Judy', percentage='91%', passed='Yes'),
 Row(student='Mallory', percentage='34%', passed='No')]

In [15]:
dataMap_df=spark.createDataFrame(dataMap)

In [16]:
dataMap_df.show()

+-------+----------+------+
|student|percentage|passed|
+-------+----------+------+
|  Alice|       70%|   Yes|
|    Bob|       36%|    No|
|  Carol|       95%|   Yes|
|    Dan|       63%|   Yes|
|    Eve|       43%|    No|
|  Frank|       84%|   Yes|
|  Grace|       54%|   Yes|
|  Heidi|       15%|    No|
|   Ivan|       21%|    No|
|   Judy|       91%|   Yes|
|Mallory|       34%|    No|
+-------+----------+------+




## Question # 3

### Working with parallelize function. Creating RDDs from parallelize() and mapping RDD to dataframe

In [27]:
#creating RDD from parallelize
courses_RDD=sc.parallelize(["Big Data","Machine Learning","Intro to Web Science"\
                            ,"Network Theory","NPDEs","Data Science"])

In [28]:
courses_RDD

ParallelCollectionRDD[19] at readRDDFromFile at PythonRDD.scala:274

In [29]:
courses_RDD.collect()

['Big Data',
 'Machine Learning',
 'Intro to Web Science',
 'Network Theory',
 'NPDEs',
 'Data Science']

In [54]:
#Splitting/mapping data into various rows
courses_map=courses_RDD.map(lambda x:x.split(","))

In [55]:
courses_map.collect()

[['Big Data'],
 ['Machine Learning'],
 ['Intro to Web Science'],
 ['Network Theory'],
 ['NPDEs'],
 ['Data Science']]

In [59]:
#giving a name to each row
courses=courses_map.map(lambda x:Row(course=x[0]))

In [61]:
courses.collect()

[Row(course='Big Data'),
 Row(course='Machine Learning'),
 Row(course='Intro to Web Science'),
 Row(course='Network Theory'),
 Row(course='NPDEs'),
 Row(course='Data Science')]

In [62]:
#creating a dataframe from the updated RDD
courses_df=spark.createDataFrame(courses)

In [63]:
courses_df

DataFrame[course: string]

In [64]:
courses_df.show()

+--------------------+
|              course|
+--------------------+
|            Big Data|
|    Machine Learning|
|Intro to Web Science|
|      Network Theory|
|               NPDEs|
|        Data Science|
+--------------------+



### transforming the created RDD to a new RDD with key value pairs

In [67]:
#initial RDD created from parallelize function
courses_RDD.collect()

['Big Data',
 'Machine Learning',
 'Intro to Web Science',
 'Network Theory',
 'NPDEs',
 'Data Science']

In [84]:
#mapping the old rdd to a new RDD with key value pairs

key_val_rdd=courses_RDD.map(lambda x:(x,"a"))

In [85]:
key_val_rdd.collect()

[('Big Data', 'a'),
 ('Machine Learning', 'a'),
 ('Intro to Web Science', 'a'),
 ('Network Theory', 'a'),
 ('NPDEs', 'a'),
 ('Data Science', 'a')]


## Question # 4

### Applying filter to an RDD

In [86]:
courses_RDD.collect()

['Big Data',
 'Machine Learning',
 'Intro to Web Science',
 'Network Theory',
 'NPDEs',
 'Data Science']

In [126]:
filtered_courses=courses_RDD.filter(lambda x: x.startswith("B"))

In [127]:
filtered_courses.collect()

['Big Data']

In [128]:
#another example
filtered_courses2=courses_RDD.filter(lambda x: "Data" in x)
filtered_courses2.collect()

['Big Data', 'Data Science']


# Data Frames 
_____________________________________

## Question # 1


### Read the flightdata csv file (link mentioned below) which has three columns Destination Country Name, Origin Country Name and Count. Your task is to learn about the structure (data types) of data,

https://github.com/databricks/Spark-The-Definitive-Guide/blob/master/data/flight-data/csv/2015-summary.csv

In [129]:
#reading the data into a dataframe
flight_df=spark.read.csv("2015-summary.csv",header=True,inferSchema=True)


In [130]:
flight_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [131]:
flight_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [133]:
flight_df.dtypes

[('DEST_COUNTRY_NAME', 'string'),
 ('ORIGIN_COUNTRY_NAME', 'string'),
 ('count', 'int')]

## Question # 2

### Find the maximum number of flights from Origin country to Destination country by reading flightdata.

In [150]:
flight_df.sort(flight_df["count"].desc()).show()

+------------------+-------------------+------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+------------------+-------------------+------+
|     United States|      United States|370002|
|     United States|             Canada|  8483|
|            Canada|      United States|  8399|
|     United States|             Mexico|  7187|
|            Mexico|      United States|  7140|
|    United Kingdom|      United States|  2025|
|     United States|     United Kingdom|  1970|
|             Japan|      United States|  1548|
|     United States|              Japan|  1496|
|           Germany|      United States|  1468|
|     United States| Dominican Republic|  1420|
|Dominican Republic|      United States|  1353|
|     United States|            Germany|  1336|
|       South Korea|      United States|  1048|
|     United States|        The Bahamas|   986|
|       The Bahamas|      United States|   955|
|     United States|             France|   952|
|            France|      United States|

In [146]:
flight_df.agg({"count":"max"}).show()

+----------+
|max(count)|
+----------+
|    370002|
+----------+



## Question # 3

### Count those rows where number of flights going from Origin country to Destination country are greater than 10 by reading flightdata again.



In [168]:
flight_df.filter(flight_df["count"]>10).count()

208

## Question # 4

### Count total number of flights having destination country name is United States.



In [169]:
flight_df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [172]:
flight_df.filter(flight_df["DEST_COUNTRY_NAME"]=="United States").agg({"count":"sum"}).show()

+----------+
|sum(count)|
+----------+
|    411352|
+----------+



In [192]:
flight_df.groupBy("DEST_COUNTRY_NAME").agg({"count":"sum"}).sort("DEST_COUNTRY_NAME",ascending=False).show()

+--------------------+----------+
|   DEST_COUNTRY_NAME|sum(count)|
+--------------------+----------+
|              Zambia|         1|
|           Venezuela|       290|
|             Uruguay|        43|
|       United States|    411352|
|      United Kingdom|      2025|
|United Arab Emirates|       320|
|             Ukraine|        14|
|Turks and Caicos ...|       230|
|              Turkey|       138|
|             Tunisia|         3|
| Trinidad and Tobago|       211|
|         The Bahamas|       955|
|            Thailand|         3|
|              Taiwan|       266|
|         Switzerland|       294|
|              Sweden|       118|
|            Suriname|         1|
|               Spain|       420|
|         South Korea|      1048|
|        South Africa|        36|
+--------------------+----------+
only showing top 20 rows

