## Spark Assigment 1 by Ankit Malviya

Data Download: https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/flight-data



In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pandas as pd
#creating spark context
sc = SparkContext()
spark = SparkSession(sc)

#### Question 1)	Read data with all suitable options like header and others? Also, try to read by different command.


In [2]:

flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("2015-summary.csv")
flightData2015.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

#### Reading the data through pandas

In [60]:

pandas_data_frame = pd.read_csv('2015-summary.csv')
pandas_data_frame.head()


Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,15
1,United States,Croatia,1
2,United States,Ireland,344
3,Egypt,United States,15
4,United States,India,62


#### Question 2)	Sort the data in descending order of count and store top 20 rows in a new dataframe?

In [61]:
### implementing it in sql

from pyspark.sql.functions import desc

flightData2015.createOrReplaceTempView("flight_data_2015")
Top20_Sql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 20 """)

Top20_Sql.show()



+------------------+-----------------+
| DEST_COUNTRY_NAME|destination_total|
+------------------+-----------------+
|     United States|           411352|
|            Canada|             8399|
|            Mexico|             7140|
|    United Kingdom|             2025|
|             Japan|             1548|
|           Germany|             1468|
|Dominican Republic|             1353|
|       South Korea|             1048|
|       The Bahamas|              955|
|            France|              935|
|          Colombia|              873|
|            Brazil|              853|
|       Netherlands|              776|
|             China|              772|
|           Jamaica|              666|
|        Costa Rica|              588|
|       El Salvador|              561|
|            Panama|              510|
|              Cuba|              466|
|             Spain|              420|
+------------------+-----------------+



In [3]:
### implementing it in python

from pyspark.sql.functions import desc

sorted_flight_df = flightData2015.sort(desc("count")).limit(20)
sorted_flight_df.show()

+------------------+-------------------+------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+------------------+-------------------+------+
|     United States|      United States|370002|
|     United States|             Canada|  8483|
|            Canada|      United States|  8399|
|     United States|             Mexico|  7187|
|            Mexico|      United States|  7140|
|    United Kingdom|      United States|  2025|
|     United States|     United Kingdom|  1970|
|             Japan|      United States|  1548|
|     United States|              Japan|  1496|
|           Germany|      United States|  1468|
|     United States| Dominican Republic|  1420|
|Dominican Republic|      United States|  1353|
|     United States|            Germany|  1336|
|       South Korea|      United States|  1048|
|     United States|        The Bahamas|   986|
|       The Bahamas|      United States|   955|
|     United States|             France|   952|
|            France|      United States|

In [4]:
### casting the type of Count

from pyspark.sql.types import DoubleType

Type_casted_df = sorted_flight_df.withColumn("count", flightData2015["count"].cast(DoubleType()))

Type_casted_df.show()

+------------------+-------------------+--------+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|   count|
+------------------+-------------------+--------+
|     United States|      United States|370002.0|
|     United States|             Canada|  8483.0|
|            Canada|      United States|  8399.0|
|     United States|             Mexico|  7187.0|
|            Mexico|      United States|  7140.0|
|    United Kingdom|      United States|  2025.0|
|     United States|     United Kingdom|  1970.0|
|             Japan|      United States|  1548.0|
|     United States|              Japan|  1496.0|
|           Germany|      United States|  1468.0|
|     United States| Dominican Republic|  1420.0|
|Dominican Republic|      United States|  1353.0|
|     United States|            Germany|  1336.0|
|       South Korea|      United States|  1048.0|
|     United States|        The Bahamas|   986.0|
|       The Bahamas|      United States|   955.0|
|     United States|             France|   952.0|


In [6]:
### findint total number of flights from a particular destination

from pyspark.sql.functions import desc

flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(20).show()


+------------------+-----------------+
| DEST_COUNTRY_NAME|destination_total|
+------------------+-----------------+
|     United States|           411352|
|            Canada|             8399|
|            Mexico|             7140|
|    United Kingdom|             2025|
|             Japan|             1548|
|           Germany|             1468|
|Dominican Republic|             1353|
|       South Korea|             1048|
|       The Bahamas|              955|
|            France|              935|
|          Colombia|              873|
|            Brazil|              853|
|       Netherlands|              776|
|             China|              772|
|           Jamaica|              666|
|        Costa Rica|              588|
|       El Salvador|              561|
|            Panama|              510|
|              Cuba|              466|
|             Spain|              420|
+------------------+-----------------+



#### Question 3)	How can we create a temporary view of the dataframe?

In [7]:


flightData2015.createOrReplaceTempView("flight_data_2015")
flightData2015.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

#### Question 4)	Register dataframe as a table?

In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

sqlContext.registerDataFrameAsTable(flightData2015, "table1")

#sqlContext.dropTempTable("table1")

#### Question 5)	Count of maximum destination country and sort them. Compare spark SQL with Dataframe.

In [9]:
from pyspark.sql.functions import desc

flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).show()




+------------------+-----------------+
| DEST_COUNTRY_NAME|destination_total|
+------------------+-----------------+
|     United States|           411352|
|            Canada|             8399|
|            Mexico|             7140|
|    United Kingdom|             2025|
|             Japan|             1548|
|           Germany|             1468|
|Dominican Republic|             1353|
|       South Korea|             1048|
|       The Bahamas|              955|
|            France|              935|
|          Colombia|              873|
|            Brazil|              853|
|       Netherlands|              776|
|             China|              772|
|           Jamaica|              666|
|        Costa Rica|              588|
|       El Salvador|              561|
|            Panama|              510|
|              Cuba|              466|
|             Spain|              420|
+------------------+-----------------+
only showing top 20 rows



#### Question 6)	Find maximum number of flights to and from any given location? Perform the task using Python and SQL both.

In [10]:
#for max count from a country
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [11]:

#for the flights from US to Japan in python
flightData2015.where(flightData2015.DEST_COUNTRY_NAME == 'Japan').show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Japan|      United States| 1548|
+-----------------+-------------------+-----+



In [70]:
## for the flights from US to Japan in SQL
max_dest_df = spark.sql(""" SELECT DEST_COUNTRY_NAME,sum(count) as total_flights_to FROM flight_data_2015  WHERE DEST_COUNTRY_NAME = "Japan" GROUP BY DEST_COUNTRY_NAME """).show()


+-----------------+----------------+
|DEST_COUNTRY_NAME|total_flights_to|
+-----------------+----------------+
|            Japan|            1548|
+-----------------+----------------+



In [12]:
# Maximun No. of flighs from Japan to United State in Python
flightData2015.where(flightData2015.DEST_COUNTRY_NAME == 'Japan').show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Japan|      United States| 1548|
+-----------------+-------------------+-----+



In [13]:
# Maximun No. of flighs from Japan to United State in SQL.
max_dest_df = spark.sql("""SELECT ORIGIN_COUNTRY_NAME,sum(count) as total_flights_frm FROM flight_data_2015 WHERE ORIGIN_COUNTRY_NAME = "Japan" GROUP BY ORIGIN_COUNTRY_NAME""").show()


+-------------------+-----------------+
|ORIGIN_COUNTRY_NAME|total_flights_frm|
+-------------------+-----------------+
|              Japan|             1496|
+-------------------+-----------------+



In [14]:
# Maximun No. of flighs from Japan to United State in Python
flightData2015.where(flightData2015.ORIGIN_COUNTRY_NAME == 'Japan').show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|              Japan| 1496|
+-----------------+-------------------+-----+



#### 7)	Find out, top 5 destination countries in the data using both Python and SQL.


In [16]:
# top 5 destination countries in the data using SQL

maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5 """)


maxSql.show()



+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [17]:
# # top 5 destination countries in the data using Python
Type_casted_df.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).show()



+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|         394752.0|
|           Canada|           8399.0|
|           Mexico|           7140.0|
|   United Kingdom|           2025.0|
|            Japan|           1548.0|
+-----------------+-----------------+



### Text Data Analysis:

In [18]:
RDDread=sc.textFile('Text_Analysis.txt')
RDDread.first()

u'As a group of frogs was traveling through the woods, two of them fell into a deep pit. When the other frogs crowded around the pit and saw how deep it was, they told the two frogs that there was no hope left for them.'

In [20]:
#1. Convert paragraph into the lines of a document using space.
#Reading text file
sentences = sc.textFile("Text_Analysis.txt").map(lambda x: "".join(x)).flatMap(lambda x: x.split("."))
lines = sentences.collect()

# Print the lines
for sentence in lines:
    print(sentence)

As a group of frogs was traveling through the woods, two of them fell into a deep pit
 When the other frogs crowded around the pit and saw how deep it was, they told the two frogs that there was no hope left for them


However, the two frogs decided to ignore what the others were saying and they proceeded to try and jump out of the pit
 Despite their efforts, the group of frogs at the top of the pit were still saying that they should just give up
 That they would never make it out


Eventually, one of the frogs took heed to what the others were saying and he gave up, falling down to his death
 The other frog continued to jump as hard as he could
 Again, the crowd of frogs yelled at him to stop the pain and just die


He jumped even harder and finally made it out
 When he got out, the other frogs said, �Did you not hear us?�

The frog explained to them that he was deaf
 He thought they were encouraging him the entire time


Moral of the story: People�s words can have a big effect on oth

### 2. Remove the stop words.


In [34]:
import re
import nltk
from nltk.corpus import stopwords

split_regex = r'\W+'
def simpleTokenize(string):
    return filter(lambda strg: len(strg) > 0, re.split(split_regex, string.lower()))


stop_words = set(stopwords.words("english"))
print 'These are the stopwords: %s' % stop_words


def tokenize(string):
    return filter(lambda strg: strg not in stop_words, simpleTokenize(string))


filtter_sentences = sentences.flatMap(tokenize)

filtter_sentences.take(10)

These are the stopwords: set([u'all', u'just', u"don't", u'being', u'over', u'both', u'through', u'yourselves', u'its', u'before', u'o', u'don', u'hadn', u'herself', u'll', u'had', u'should', u'to', u'only', u'won', u'under', u'ours', u'has', u"should've", u"haven't", u'do', u'them', u'his', u'very', u"you've", u'they', u'not', u'during', u'now', u'him', u'nor', u"wasn't", u'd', u'did', u'didn', u'this', u'she', u'each', u'further', u"won't", u'where', u"mustn't", u"isn't", u'few', u'because', u"you'd", u'doing', u'some', u'hasn', u"hasn't", u'are', u'our', u'ourselves', u'out', u'what', u'for', u"needn't", u'below', u're', u'does', u"shouldn't", u'above', u'between', u'mustn', u't', u'be', u'we', u'who', u"mightn't", u"doesn't", u'were', u'here', u'shouldn', u'hers', u"aren't", u'by', u'on', u'about', u'couldn', u'of', u"wouldn't", u'against', u's', u'isn', u'or', u'own', u'into', u'yourself', u'down', u"hadn't", u'mightn', u"couldn't", u'wasn', u'your', u"you're", u'from', u'her', u'

[u'group',
 u'frogs',
 u'traveling',
 u'woods',
 u'two',
 u'fell',
 u'deep',
 u'pit',
 u'frogs',
 u'crowded']

#### 3)	Group the words in above rdd based on the first 3 characters of each words.


In [37]:
same_word_grp = filtter_sentences.groupBy(lambda w: w[0:3])
print [(k, list(v)) for (k, v) in same_word_grp.take(5)]


[(u'sai', [u'said']), (u'pro', [u'proceeded']), (u'say', [u'saying', u'saying', u'saying', u'say']), (u'saw', [u'saw']), (u'tho', [u'thought'])]


#### 4)	Calculate the word frequency in the corpus. // Task1 complete


In [38]:
from operator import add

counts = sentences.flatMap(lambda x: [(w.lower(), 1) for w in x.split()]).reduceByKey(add)

output = counts.collect()
for (word, count) in output:
    print "%s: %i" % (word, count)



and: 7
said,: 1
words: 1
eventually,: 1
frogs: 8
into: 1
deaf: 1
life: 1
deep: 2
jump: 2
as: 3
through: 1
thought: 1
have: 1
pit: 4
even: 1
ignore: 1
up,: 1
group: 2
just: 3
for: 1
saw: 1
top: 1
when: 2
out,: 1
frog: 2
finally: 1
lives: 1
falling: 1
was: 3
hope: 1
them: 3
pain: 1
again,: 1
us?�: 1
stop: 1
what: 3
you: 2
hear: 1
they: 5
not: 1
proceeded: 1
continued: 1
the: 20
a: 3
down: 1
about: 1
would: 1
woods,: 1
might: 1
could: 1
harder: 1
say: 1
crowd: 1
try: 1
�did: 1
heed: 1
moral: 1
however,: 1
mouth: 1
make: 1
comes: 1
saying: 3
give: 1
story:: 1
others: 2
it: 5
one: 1
decided: 1
traveling: 1
left: 1
still: 1
their: 1
out: 4
death: 2
encouraging: 1
no: 1
other�s: 1
there: 1
two: 3
should: 1
to: 7
other: 3
between: 1
jumped: 1
before: 1
told: 1
be: 1
his: 1
around: 1
big: 1
that: 4
got: 1
never: 1
took: 1
effect: 1
how: 1
were: 4
fell: 1
was,: 1
despite: 1
explained: 1
entire: 1
difference: 1
him: 2
he: 6
your: 1
on: 1
made: 1
hard: 1
crowded: 1
die: 1
up: 1
can: 1
efforts,: 1


### Perform some other transformations:

#### a)	Create sample from complete data.

In [39]:
sample_without_rep1 = filtter_sentences.sample(False, 0.3,35)      ##70% sample without Replacement
sample_without_rep2 = filtter_sentences.sample(False, 0.3,35)
print(filtter_sentences.count())
print(sample_without_rep1.count())
print(sample_without_rep2.count())

sample_without_rep1.take(2)
sample_without_rep2.take(2)

95
33
33


[u'group', u'pit']

In [40]:
sample_with_rep1 = filtter_sentences.sample(True, 0.3, 35)      ##70% sample with Replacement
sample_with_rep2 = filtter_sentences.sample(True, 0.3, 35)
print(filtter_sentences.count())
print(sample_with_rep1.count())
print(sample_with_rep2.count())
sample_with_rep.take(2)

95
26
26


NameError: name 'sample_with_rep' is not defined

#### b)	Union (create 2 sample and try doing union transformation)

In [41]:
union_data = sample_without_rep1.union(sample_without_rep2)
print(sample_without_rep1.count())
print(sample_without_rep2.count())
print(union_data.count())
union_data.take(5)

33
33
66


[u'group', u'pit', u'around', u'saw', u'deep']

In [42]:
union_data = sample_with_rep1.union(sample_with_rep2)
print(sample_with_rep1.count())
print(sample_with_rep2.count())
print(union_data.count())
union_data.take(5)

26
26
52


[u'two', u'fell', u'frogs', u'however', u'two']

#### c)	Join (Using key and without key)

In [43]:
join_data = sample_with_rep1.join(sample_with_rep2)
print(sample_with_rep1.count())
print(sample_with_rep2.count())
print(join_data.count())
join_data.take(5)

26
26
72


[(u'm', (u'a', u'a')),
 (u'm', (u'a', u'i')),
 (u'm', (u'i', u'a')),
 (u'm', (u'i', u'i')),
 (u'e', (u'f', u'f'))]

#### d)	Distinct in rdd

In [44]:

filtter_sentences_distinct = filtter_sentences.distinct()
len(filtter_sentences_distinct.collect())



#filtter_sentences_distinct.take(20)


75