# Introduction to Spark with Python
### This example is from the [Dataquest](https://www.dataquest.io/mission/123/introduction-to-spark/4/spark-context) lesson.
This notebook is a basic introductory demonstration of how to interact with Resilient Distributed Datasets (RDD's), a core data structure in Spark, with Python. 
Thanks to PySpark, we can interface with RDD's using Python instead of Scala, the language Spark is written in. 
In this notebook, we are using a dataset that conatains all of the guests of the Daily Show.

The records are labeled by:
<ul>
<li>Year</li>
<li>Occupation</li>
<li>Show (data)</li>
<li>Group</li>
<li>Guest List</li>
</ul>
           
            






## Testing successful connection to spark cluster from notebook
Spark Context (sc) is the object that manages the connection to the clusters in Spark and coordinates running processes on the clusters themselves.

In [1]:
print(sc)

<pyspark.context.SparkContext object at 0x1048066d8>


In [2]:
print(sc.version)

2.1.0


## Note: 
   The key to understanding Spark is the idea of data pipelining. Every operation in Spark is a series of steps chained together and run in succession to form a pipeline. Each step in the pipeline returns either a Python value, a Python data structure, or an RDD object. 

## Read in csv file of daily show guests

In [3]:
raw_data = sc.textFile ("./data/daily_show_guests.csv")

In [4]:
raw_data.take(5)

['YEAR,GoogleKnowlege_Occupation,Show,Group,Raw_Guest_List',
 '1999,actor,1/11/99,Acting,Michael J. Fox',
 '1999,Comedian,1/12/99,Comedy,Sandra Bernhard',
 '1999,television actress,1/13/99,Acting,Tracey Ullman',
 '1999,film actress,1/14/99,Acting,Gillian Anderson']

## Split the RDD  by the ',' deliminator
Sice RDD's are immutable, they must be assigned to a new RDD

In [5]:
daily_show = raw_data.map(lambda line: line.split(','))
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

## Notice tally does not print because of Spark's "Lazy Operation"
Because of Lazy Operation, PySpark delays execution of tranformation operations until we actually need it. (e.g take( ) to preview the first few elements in tally)

In [12]:
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(tally)

PythonRDD[21] at RDD at PythonRDD.scala:48


In [13]:
tally.take(tally.count())

[('YEAR', 1),
 ('2012', 164),
 ('2013', 166),
 ('2004', 164),
 ('2011', 163),
 ('2014', 163),
 ('2002', 159),
 ('2007', 141),
 ('2015', 100),
 ('2003', 166),
 ('2010', 165),
 ('2001', 157),
 ('2000', 169),
 ('2008', 164),
 ('2005', 162),
 ('2009', 163),
 ('1999', 166),
 ('2006', 161)]

## Unlike Pandas, PySpark does not remove the header row. So, we will need to do this ourselves

In [16]:
def filter_year(line):
    if line[0] == 'YEAR':
        return False
    else:
        return True
filtered_daily_show = daily_show.filter(lambda line:filter_year(line))

## All together now
To flex Spark's muscles, we'll demonstrate how to chain together a series of data transformations into a pipeline and observe Spark managing everything in the background. Previously, running lots of tasks in succession in Hadoop was very time consuming since intermediate results needed to be written to diskd and Hadoop wasn't aware of the full pipeline. 

Thanks to Spark's aggressive usage of memory(and only disk as a backup and for specific tasks) and well architected core, Spark is able to improve significantly on Hadoop's turnaround time. 

In the following code, we'll filter out actors with no profession listed, lowercase each profession, generate a histogram of professions, and output the first 5 tuples in the histogram. 

In [20]:
filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1))\
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5)

[('radio personality', 3),
 ('former governor of new york', 1),
 ('illustrator', 1),
 ('presidnet', 3),
 ('former united states secretary of state', 6)]

All copyrights belong to their respective owners including Dataquest.

Code and text are used here only for education, and are not intended to generate income.