In [20]:

# Main Source -- http://spark.apache.org/docs/2.1.0/api/python/pyspark.html
# Source --- DATAQuest --- https://www.dataquest.io/blog/apache-spark/


sc.stop()  # To stop any other Running Spark Context


import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="Daily_Show_Test1")
 
print sc # Not required - it shall give a diff output each time we initiate a context


# ie -- <pyspark.context.SparkContext object at 0x7f4148bd26d0>



<pyspark.context.SparkContext object at 0x7f4125d6cf10>


In [21]:

# PySpark initiated with FindSpark
# SparkContext started 
# Got CSV from ...u guessed it.

# Converted CSV to TSV .. using Unix shell command -- < filename.csv tr "," "\t" > tabfile
# Source for above cmd -- http://www.linuxquestions.org/questions/programming-9/convert-csv-to-tab-683201/

# Now importing data from TSV 
# raw_d == the SPARK RDD Object --- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
# Print out top 15 Rows 

raw_d = sc.textFile("DATA_Files/dsT1.tsv")

# 
# In the above line of Code - actual Loading of CSV in RDD is Not yet Done 
# Its Done LAZILY - "as and when ABOSULUTELY required" as below - 
#

raw_d.take(15) 

# The RDD Objects may Look like Pythin LISTS 
# but they are NOT - we cant access elements like a LIST with [] - square bracket notation 
# data in a RDD is stored across partitions
# thats the reason we have to use raw_d.take()
#
# SPARK when running Locally - as in my case 
# simulates partitions in local Memory. 
#


[u'YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 u'1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 u'1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 u'1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 u'1999\tfilm actress\t1/14/99\tActing\tGillian Anderson',
 u'1999\tactor\t1/18/99\tActing\tDavid Alan Grier',
 u'1999\tactor\t1/19/99\tActing\tWilliam Baldwin',
 u'1999\tSinger-lyricist\t1/20/99\tMusician\tMichael Stipe',
 u'1999\tmodel\t1/21/99\tMedia\tCarmen Electra',
 u'1999\tactor\t1/25/99\tActing\tMatthew Lillard',
 u'1999\tstand-up comedian\t1/26/99\tComedy\tDavid Cross',
 u'1999\tactress\t1/27/99\tActing\tYasmine Bleeth',
 u'1999\tactor\t1/28/99\tActing\tD. L. Hughley',
 u'1999\ttelevision actress\t10/18/99\tActing\tRebecca Gayheart',
 u'1999\tComedian\t10/19/99\tComedy\tSteven Wright']

In [22]:
# Using a 'map' function operate on all elements within a RDD object
# Each Line of the DATA is operated upon with whatever LOGIC is written within - map()
# The line.split('\t) --- Means Split at each TAB and create a NEW LINE 
# A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

daily_show = raw_d.map(lambda line: line.split('\t'))     ### 'map' is a SPARK-TRANSFORMATION 
daily_show.take(5)                                        ### 'take' is a SPARK-ACTION 



# print daily_show # Not required 

# PythonRDD[12] at RDD at PythonRDD.scala:48

# here the daily_show is a PIPELINE RDD Object 

[[u'YEAR', u'GoogleKnowlege_Occupation', u'Show', u'Group', u'Raw_Guest_List'],
 [u'1999', u'actor', u'1/11/99', u'Acting', u'Michael J. Fox'],
 [u'1999', u'Comedian', u'1/12/99', u'Comedy', u'Sandra Bernhard'],
 [u'1999', u'television actress', u'1/13/99', u'Acting', u'Tracey Ullman'],
 [u'1999', u'film actress', u'1/14/99', u'Acting', u'Gillian Anderson']]

In [9]:
'''Transformations are lazy operations and always return a reference to an RDD object.
The transformation, however, is not actually run until an action needs to use the
resulting RDD from a transformation.

Any function that returns an RDD is a transformation
and any function that returns a value is an action.


'''


'Transformations are lazy operations and always return a reference to an RDD object.\nThe transformation, however, is not actually run until an action needs to use the\nresulting RDD from a transformation.\n\nAny function that returns an RDD is a transformation\nand any function that returns a value is an action.\n\n\n'

In [25]:
# The map() - is a SPARK-TRANSFORMATION 
# The ReduceByKey() - is a SPARK-TRANSFORMATION 

tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
#

print(tally)
#
print("  ")
print("__#___"*20)
print("  ")
#
print(tally.take(20))

#
# PythonRDD[18] at RDD at PythonRDD.scala:48
# Doesnt actually Print the TALLY 
# We need --- Key + Value -- Where KEY is a YEAR == 1991 and VALUE is "How Many Guests or LINES in that Year"
# How many lines of data have the same YEAR == 1991 value. 

PythonRDD[21] at RDD at PythonRDD.scala:48
  
__#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#_____#___
  
[(u'1999', 166), (u'2002', 159), (u'2000', 169), (u'2006', 161), (u'2004', 164), (u'2015', 100), (u'2008', 164), (u'2011', 163), (u'2013', 166), (u'2005', 162), (u'2003', 166), (u'2001', 157), (u'2007', 141), (u'YEAR', 1), (u'2014', 163), (u'2009', 163), (u'2010', 165), (u'2012', 164)]


In [12]:
## 'tally'  - Frequency of Occurence of Year Value -HistoGram !
## As we cant measure LENGTH of the TALLY Object which is a RDD and a LIST of TUPLES 
## We shall COUNT its total elements
## Each TUPLE has a KEY == 1999,2002,2000 etc ...
## Each TUPLE also has Values == 166,159,169 etc ...

tally.take(tally.count())

# Seen below the -- (u'YEAR', 1), is the 1 single occurence of STRING -- YEAR
# Spark doesnt create COLUMN HEADERS , Column Lablels like Python 
# So we need to get rid of this GARBAGE value ... its of no use to us. 
# 



[(u'1999', 166),
 (u'2002', 159),
 (u'2000', 169),
 (u'2006', 161),
 (u'2004', 164),
 (u'2015', 100),
 (u'2008', 164),
 (u'2011', 163),
 (u'2013', 166),
 (u'2005', 162),
 (u'2003', 166),
 (u'2001', 157),
 (u'2007', 141),
 (u'YEAR', 1),
 (u'2014', 163),
 (u'2009', 163),
 (u'2010', 165),
 (u'2012', 164)]

In [26]:

# Spark RDD's are Immutable - like Py TUPLES-- values cant be changed once created ...
# Cant remove the line which is a Pseudo Column Header == (u'YEAR', 1),
# Thus need to create another RDD using Filter 
#



def filter_year(line):                    # filter_year FUNCTION  - defined here
    if line[0] == 'YEAR':                 # if line[0] == YEAR give a FALSE else a TRUE 
        return False                      # Thus besides YEAR:1 , all values TRUE
    else:
        return True
    
# Filter here gives us a     

filtered_daily_show = daily_show.filter(lambda line: filter_year(line)) # lamda line: similar to FOR Loop Each Line  

print(filtered_daily_show.take(20))

# As seen below - the background processing is being done by SCALA

[[u'1999', u'actor', u'1/11/99', u'Acting', u'Michael J. Fox'], [u'1999', u'Comedian', u'1/12/99', u'Comedy', u'Sandra Bernhard'], [u'1999', u'television actress', u'1/13/99', u'Acting', u'Tracey Ullman'], [u'1999', u'film actress', u'1/14/99', u'Acting', u'Gillian Anderson'], [u'1999', u'actor', u'1/18/99', u'Acting', u'David Alan Grier'], [u'1999', u'actor', u'1/19/99', u'Acting', u'William Baldwin'], [u'1999', u'Singer-lyricist', u'1/20/99', u'Musician', u'Michael Stipe'], [u'1999', u'model', u'1/21/99', u'Media', u'Carmen Electra'], [u'1999', u'actor', u'1/25/99', u'Acting', u'Matthew Lillard'], [u'1999', u'stand-up comedian', u'1/26/99', u'Comedy', u'David Cross'], [u'1999', u'actress', u'1/27/99', u'Acting', u'Yasmine Bleeth'], [u'1999', u'actor', u'1/28/99', u'Acting', u'D. L. Hughley'], [u'1999', u'television actress', u'10/18/99', u'Acting', u'Rebecca Gayheart'], [u'1999', u'Comedian', u'10/19/99', u'Comedy', u'Steven Wright'], [u'1999', u'actress', u'10/20/99', u'Acting', u'A

In [16]:
## 11. All together now ##

filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
.take(20)

# Clearly the Daily Show likes Actresses 
# u'actress', 271


[(u'secretary of state', 1),
 (u'former president of the maldives', 1),
 (u'professional road racing cyclist', 2),
 (u'actress', 271),
 (u'television series creator', 3),
 (u'former governor of illinois', 2),
 (u'telvision personality', 1),
 (u'economist', 17),
 (u'attorney at law', 1),
 (u'baseball player', 8),
 (u'former american senator', 4),
 (u'former governor of nebraska', 4),
 (u'freelance writer', 2),
 (u'former mayor of cincinatti', 2),
 (u'former united states national security advisor', 1),
 (u'astronaut', 2),
 (u'soccer manager', 2),
 (u'public speaker', 1),
 (u'hip-hop artist', 2),
 (u'political consultant', 1)]

In [None]:
#getConf()