In [1]:
# Import packages
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row

In [2]:
# Configure and create the Spark Context object 

conf = SparkConf().setAppName("Project1").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:

# Read the documents locally, one record correspond to one doc
XTrainOri = sc.textFile("data_for_initial_local_training/X_train_vsmall.txt").cache()
YTrainOri = sc.textFile("data_for_initial_local_training/Y_train_vsmall.txt").cache()


# To read the documents on GCP
#XTrainOri = sc.textFile("gs://uga-dsp/project1/train/X_train_vsmall.txt").cache()
#YTrainOri = sc.textFile("gs://uga-dsp/project1/train//Y_train_vsmall.txt").cache()


In [4]:
# split documents into words, remove punctuation, trainsform words to lower case

# ? still need to remove quot from words, and punctuation in the middle of a word

punctuation = sc.broadcast(".,:;'!?&_-()")
XTrainCP = XTrainOri.zipWithIndex().flatMap(lambda x: map(lambda e: (e, x[1]), x[0].split()))\
                    .map(lambda s: (s[0].strip(punctuation.value).lower(),s[1])) 
                    
print(XTrainCP.take(10))

[('a', 0), ('dedicated', 0), ('quot;snow', 0), ('desk&quot', 0), ('has', 0), ('been', 0), ('set', 0), ('up', 0), ('by', 0), ('the', 0)]


In [5]:
# Remove Stopwords using the file stopwords.txt provided in project0

# ? Change source of stopwords
stopWordsFile = sc.textFile("stopwords.txt")
stopWords = sc.broadcast(stopWordsFile.flatMap(lambda s: s.split()).collect())

XTrainCPS = XTrainCP.filter(lambda s: s if s[0] not in stopWords.value and s[0] else None) \
                    
print(XTrainCPS.take(10))

[('dedicated', 0), ('quot;snow', 0), ('desk&quot', 0), ('set', 0), ('up', 0), ('new', 0), ('york', 0), ('new', 0), ('jersey', 0), ('port', 0)]


In [6]:
# count the # of (word,index of document) pairs 

XTrainCPSC = XTrainCPS.map(lambda s: (s,1))\
                        .reduceByKey( lambda a,b: a + b)
print(XTrainCPSC.take(10))    

[(('rebounding', 46), 1), (('+0.4', 19), 1), (('quot;foreign', 1), 1), (('generally', 51), 1), (('intentions', 24), 1), (("thursday's", 35), 1), (('bounce', 8), 1), (('annual', 67), 1), (("don't", 3), 1), (('snow-melters', 0), 1)]


In [7]:
# the structure of output: (index of document, dictionary of {word: count of word})

XTrainDict = XTrainCPSC.map(lambda x: (x[0][1],(x[0][0],x[1])))\
                            .groupByKey()\
                            .map(lambda x: (x[0],dict(x[1])))


print(XTrainDict.take(1))

[(0, {'weather': 5, 'key': 2, 'humidity': 1, '171': 2, '5,100': 1, 'reports': 2, 'staff': 1, 'laguadria': 1, 'cargo': 2, 'blowers': 1, 'deploy': 1, 'officer': 1, 'data': 1, 'operations': 2, 'forecasts': 1, 'employees': 1, 'direction': 1, 'maintenance': 1, 'advance': 1, 'through': 1, 'salt': 1, 'fax+44': 1, 'technology': 1, 'few': 1, 'hit': 1, 'private': 1, "what's": 1, 'kennedy': 1, 'tons': 1, 'travellers': 1, 'david': 1, 'anticipate': 1, 'round': 1, 'set': 1, 'york': 1, 'national': 1, 'jersey': 1, 'each': 4, "don't": 1, 'almost': 1, 'use': 1, 'dedicated': 2, 'snow': 1, 'counter': 1, 'blast': 1, 'chief': 1, 'windspeed': 1, 'pieces': 1, 'monitor': 1, 'accordingly': 1, 'sit': 1, 'deployment': 1, 'harsh': 1, '5017': 1, 'special': 1, 'inground': 1, 'including': 1, '250': 1, 'disruption': 1, 'snow-melters': 1, 'operates': 1, 'de-icing': 1, 'companies': 1, 'supplements': 1, 'f': 1, 'airport': 2, 'times': 1, '7706': 1, 'facility-specific': 1, 'day': 1, 'react': 1, 'personnel': 1, 'jfk': 1, 'a

In [8]:
# collect all words throughout all documents as column names.
cols = XTrainCPS.map(lambda s: s[0])\
                .distinct()\
                .collect()
print(len(cols))    

4218


In [9]:
# create row object for each document, put 0 if the word does not exist in the document

XTrainRow = XTrainDict.map(lambda x: [Row(**{k:(x[1][k] if k in x[1].keys() else 0) for k in cols})],cols)

In [10]:
# Create the data frame

sqlContext = SQLContext(sc)

XTrainDF = sqlContext.createDataFrame(XTrainRow)


In [11]:
# display the first two rows of the data frame
XTrainDF.head(2)

[Row(_1=Row($=0, $0.24=0, $1.2=0, $1.23=0, $1.3=0, $1.6-million=0, $1.61=0, $1.74=0, $1.95=0, $11=0, $11.00-12.50=0, $12.50-14.50=0, $13.650=0, $15.00-16.00=0, $15.00-17.00=0, $16.00-16.75=0, $16.00-18.00=0, $16.50-18.00=0, $16.90-17.90=0, $17.00-18.00=0, $1=53=0, $2=0, $2.15=0, $20.15-20.30=0, $200,000=0, $21.05-21.25=0, $21.79=0, $23.96=0, $25=0, $265=0, $28.015=0, $28.5=0, $3=0, $3.5=0, $4.4=0, $410=0, $447=0, $482=0, $51=0, $55.895=0, $7.3=0, $7.5=0, $700=0, $8.50-9.00=0, $9.00-10.00=0, $900=0, $=550=0, *=0, +0.02=0, +0.3=0, +0.4=0, +0.5=0, +0.9=0, +2.6=0, +202=0, +225=0, +27=0, +3.0=0, +371=0, +396=0, +41=0, +44=0, +48=0, +7095=0, +=254-2=0, 0=0, 0-0=0, 0-3=0, 0.0=0, 0.14=0, 0.150=0, 0.2=0, 0.275=0, 0.300=0, 0.46=0, 0.6=0, 0.6890/00=0, 0.6925=0, 0.6938=0, 0.6950=0, 0.6950-70=0, 0.725=0, 0.8=0, 000=0, 0200=0, 0917=0, 1=0, 1,000=0, 1,081.19=0, 1,148.8=0, 1,200=0, 1,764=0, 1-0=0, 1-1=0, 1-1/4=0, 1.00=0, 1.08=0, 1.09=0, 1.1=0, 1.1243=0, 1.1260=0, 1.16=0, 1.25=0, 1.27=0, 1.3=0, 1.3462/