In [1]:
import pyspark
from pyspark import SparkContext
from pyspark import SQLContext

In [94]:
from pyspark.sql.functions import col, udf, collect_list, explode
from pyspark.sql.types import ArrayType, StringType
import re

In [3]:
spc = SparkContext.getOrCreate()
sc = SQLContext(spc)
if (sc == None or sc._jsc.sc().isStopped()):
    print("hello")
    sc = SparkContext().Create()

In [4]:
df = sc.read.parquet("gs://data05839/projects.parquet").persist(pyspark.StorageLevel.MEMORY_AND_DISK)

In [5]:
cols = ['ProjectID', 'SchoolID', 'TeacherID',
       'TeacherProjectPostedSequence', 'ProjectType', 'ProjectTitle',
       'ProjectEssay', 'ProjectShortDescription', 'ProjectNeedStatement',
       'ProjectSubjectCategoryTree', 'ProjectSubjectSubcategoryTree',
       'ProjectGradeLevelCategory', 'ProjectResourceCategory',
       'ProjectCost', 'ProjectPostedDate', 'ProjectExpirationDate',
       'ProjectCurrentStatus', 'ProjectFullyFundedDate' ,'date']

In [6]:
df.show()

+--------------------+--------------------+--------------------+----------------------------+-----------+--------------------+--------------------+-----------------------+--------------------+--------------------------+-----------------------------+-------------------------+-----------------------+-----------+-----------------+---------------------+--------------------+----------------------+-------------------+-----------------+
|           ProjectID|            SchoolID|           TeacherID|TeacherProjectPostedSequence|ProjectType|        ProjectTitle|        ProjectEssay|ProjectShortDescription|ProjectNeedStatement|ProjectSubjectCategoryTree|ProjectSubjectSubcategoryTree|ProjectGradeLevelCategory|ProjectResourceCategory|ProjectCost|ProjectPostedDate|ProjectExpirationDate|ProjectCurrentStatus|ProjectFullyFundedDate|               date|__index_level_0__|
+--------------------+--------------------+--------------------+----------------------------+-----------+--------------------+------

In [74]:
df.filter(col("ProjectID").isNull()).count()

0

In [22]:
df.groupBy('ProjectResourceCategory').count().orderBy('count').collect()

[Row(ProjectResourceCategory='Visitors', count=1696),
 Row(ProjectResourceCategory='Musical Instruments', count=4066),
 Row(ProjectResourceCategory='Food, Clothing & Hygiene', count=5438),
 Row(ProjectResourceCategory='Sports & Exercise Equipment', count=6733),
 Row(ProjectResourceCategory='Lab Equipment', count=7436),
 Row(ProjectResourceCategory='Art Supplies', count=8782),
 Row(ProjectResourceCategory='Classroom Basics', count=11862),
 Row(ProjectResourceCategory='Other', count=11862),
 Row(ProjectResourceCategory='Trips', count=13306),
 Row(ProjectResourceCategory='Flexible Seating', count=14425),
 Row(ProjectResourceCategory='Reading Nooks, Desks & Storage', count=15327),
 Row(ProjectResourceCategory='Instructional Technology', count=18957),
 Row(ProjectResourceCategory='Educational Kits & Games', count=22051),
 Row(ProjectResourceCategory='Computers & Tablets', count=32915),
 Row(ProjectResourceCategory='Books', count=107177),
 Row(ProjectResourceCategory='Technology', count=1301

In [60]:
rdd_word_cnts.unpersist()

PythonRDD[133] at RDD at PythonRDD.scala:53

In [61]:
df_resource = df.filter("ProjectResourceCategory = 'Art Supplies'")
rdd_words = df_resource.select('ProjectEssay').rdd\
              .flatMap(lambda x: x).flatMap(lambda x: re.split(r"[^a-z-']+", x.lower()))

In [62]:
rdd_word_cnts = rdd_words.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b).cache()

In [63]:
rdd_word_cnts.take(10)

[('are', 34915),
 ('diverse', 2084),
 ('of', 51416),
 ('person', 208),
 ('learners', 2589),
 ('have', 21722),
 ('high', 2846),
 ('group', 2420),
 ('implementing', 60),
 ('start', 903)]

In [64]:
stopwords = {'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", 
             "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 
             'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 
             'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 
             'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 
             'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 
             'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 
             'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 
             'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 
             'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 
             'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 
             'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', 
             "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', 
             "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 
             'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 
             'won', "won't", 'wouldn', "wouldn't"}

In [65]:
rdd_word_cnts_filtered_sorted = rdd_word_cnts.filter(lambda x: x[0] not in stopwords)\
                                             .takeOrdered(300, key = lambda x: -x[1])

In [66]:
[a for a, b in rdd_word_cnts_filtered_sorted[:200]]

['students',
 'school',
 '--donotremoveessaydivider--',
 'art',
 'learning',
 'classroom',
 'help',
 'learn',
 '',
 'many',
 'need',
 'create',
 'use',
 'materials',
 'supplies',
 'work',
 'make',
 'love',
 'come',
 'new',
 'class',
 'year',
 'able',
 'also',
 'day',
 'would',
 'projects',
 'project',
 'skills',
 'want',
 'children',
 'paper',
 'one',
 'creative',
 'time',
 'every',
 'different',
 'like',
 'get',
 'student',
 'community',
 'allow',
 'paint',
 'provide',
 'using',
 'teach',
 'used',
 'grade',
 'activities',
 'way',
 'free',
 'fun',
 'high',
 'us',
 'life',
 'world',
 'science',
 'best',
 'well',
 'math',
 'kids',
 'opportunity',
 'learners',
 'see',
 'give',
 'education',
 'language',
 'markers',
 'needs',
 'families',
 'first',
 'experience',
 'teacher',
 'group',
 'home',
 'things',
 'lunch',
 'special',
 'reading',
 'creating',
 'working',
 'arts',
 'writing',
 'great',
 'excited',
 'explore',
 'creativity',
 'much',
 'variety',
 'diverse',
 'small',
 'hard',
 'eager

## remove "ing" in the tokens before computing word features, try s and no s as plural

In [71]:
word_feats = ['student', 'school', 'learn', 'classroom', 'help', 'work', 'read', 'love','day','class','skill','book',
 'technology', 'time', 'one', 'math', 'material','grade','children','different','project','teach','like',
 'world','create','best','learners','science','education','community','language','home','activities','free',
 'access','opportunity','life','first','fun','hard','environment',
 'lunch','resource','experience', 'opportunities','excited','diverse','eager','play','art',
 'challenge','creative','goal','music','amazing','social',
 'poverty','games','hands-on','research','knowledge','engaging','safe','computer','literacy','reduced',
 'however','comfortable', 'band', 'instrument', 'musical', 'healthy', 'breakfast', 'hungry', 'team', 'sport',
 'hurricane', 'health', 'volleyball', 'basketball', 'soccer', 'college', 'museum', 'paint', 'activity']

## process df to get word features

In [72]:
len(word_feats)

85

In [87]:
@udf(returnType=ArrayType(StringType()))
def get_feats(ProjectID, text):
    tokens = re.split(r"[^a-z-']+", text.lower())
    L = [0] * len(word_feats)
    for i in range(0, len(word_feats)):
        f = word_feats[i]
        for t in tokens:
            if t.endswith('s'):
                if t == f or t[:-1] == f:
                    L[i] += 1
            elif t.endswith('ing'):
                if t == f or t[:-3] == f:
                    L[i] += 1
            else:
                if t == f:
                    L[i] += 1
    return [ProjectID] + [str(cnt) for cnt in L]
                

In [91]:
df = df.repartition(200)

In [92]:
feats = df.select(get_feats("ProjectID", "ProjectEssay")) \
          .persist(pyspark.StorageLevel.MEMORY_AND_DISK)

In [93]:
feats.show()

+----------------------------------+
|get_feats(ProjectID, ProjectEssay)|
+----------------------------------+
|              [1dd5a2cc71412086...|
|              [5f5d543c5840b351...|
|              [b85065c595a38b1e...|
|              [1e0efc5f3bf4d1ba...|
|              [e3eafe099ef343bc...|
|              [a6a65c0b2471debf...|
|              [8bfd9415970c5dee...|
|              [c982c1fc953b696e...|
|              [ea9c9e1ea82c503a...|
|              [2e7a6b6acdc39905...|
|              [23432099b09ddf9c...|
|              [c7afb1d9a066ffe2...|
|              [24d18dd85562a813...|
|              [019ee6bba8809ff6...|
|              [9c157043f300ced6...|
|              [9686fb2974418dec...|
|              [7c852f2c7173e662...|
|              [e2845c6c8380865b...|
|              [067be7698fb07d98...|
|              [291576d5cf84bf6d...|
+----------------------------------+
only showing top 20 rows



In [104]:
cols = ["ProjectID"] + word_feats

df_feats = feats.rdd.map(lambda x: tuple(x["get_feats(ProjectID, ProjectEssay)"])).toDF(cols)\
                    .persist(pyspark.StorageLevel.MEMORY_AND_DISK)

In [None]:
df_feats.count()

646745

In [106]:
df_feats.show()

+--------------------+-------+------+-----+---------+----+----+----+----+---+-----+-----+----+----------+----+---+----+--------+-----+--------+---------+-------+-----+----+-----+------+----+--------+-------+---------+---------+--------+----+----------+----+------+-----------+----+-----+---+----+-----------+-----+--------+----------+-------------+-------+-------+-----+----+---+---------+--------+----+-----+-------+------+-------+-----+--------+--------+---------+--------+----+--------+--------+-------+-------+-----------+----+----------+-------+-------+---------+------+----+-----+---------+------+----------+----------+------+-------+------+-----+--------+
|           ProjectID|student|school|learn|classroom|help|work|read|love|day|class|skill|book|technology|time|one|math|material|grade|children|different|project|teach|like|world|create|best|learners|science|education|community|language|home|activities|free|access|opportunity|life|first|fun|hard|environment|lunch|resource|experience|opp

In [107]:
df_feats.write.parquet("gs://data05839/tokens")

In [108]:
df_feats.coalesce(1).write.parquet("gs://data05839/tokens_single_file")

In [109]:
df_feats.coalesce(1).write.csv("gs://data05839/tokens_single_file_csv")