In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
baseDir = '/Users/linamiao/Downloads/'
dfSmall = sqlContext.read.parquet(baseDir + 'smallwiki.parquet')
print dfSmall.count()

13


In [7]:
print 'dfSmall: {0}'.format(dfSmall)

dfSmall: DataFrame[last_contributor_username: string, redirect_title: string, text: string, timestamp: string, title: string]


In [8]:
print 'type(dfSmall): {0}'.format(type(dfSmall))

type(dfSmall): <class 'pyspark.sql.dataframe.DataFrame'>


In [9]:
print dfSmall.schema

StructType(List(StructField(last_contributor_username,StringType,true),StructField(redirect_title,StringType,true),StructField(text,StringType,true),StructField(timestamp,StringType,true),StructField(title,StringType,true)))


In [10]:
dfSmall.printSchema()

root
 |-- last_contributor_username: string (nullable = true)
 |-- redirect_title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)



In [12]:
from pyspark.sql.types import StructField

In [15]:
from pyspark.sql.types import StructType, StructField, BooleanType, StringType, LongType
from pyspark.sql import Row

schema = StructType([StructField('title', StringType(), nullable=False, metadata={'language': 'english'}),
                     StructField('numberOfEdits', LongType()),
                     StructField('redacted', BooleanType())])

exampleData = sc.parallelize([Row("Baade's Window", 100, False),
                              Row('Zenomia', 10, True),
                              Row('United States Bureau of Mines', 5280, True)])

exampleDF = sqlContext.createDataFrame(exampleData, schema)
exampleDF.printSchema()
exampleDF.show()

root
 |-- title: string (nullable = false)
 |-- numberOfEdits: long (nullable = true)
 |-- redacted: boolean (nullable = true)

+--------------------+-------------+--------+
|               title|numberOfEdits|redacted|
+--------------------+-------------+--------+
|      Baade's Window|          100|   false|
|             Zenomia|           10|    true|
|United States Bur...|         5280|    true|
+--------------------+-------------+--------+



In [17]:
print exampleDF.schema.fields[1].metadata

{}


In [18]:
print dfSmall.first()

Row(last_contributor_username=None, redirect_title=None, text=None, timestamp=u'2017-03-15T02:49:58Z', title=u'Adobe Systems')


In [19]:
print dfSmall.columns

['last_contributor_username', 'redirect_title', 'text', 'timestamp', 'title']


In [20]:
print dfSmall.drop('text').first()

Row(last_contributor_username=None, redirect_title=None, timestamp=u'2017-03-15T02:49:58Z', title=u'Adobe Systems')


In [24]:
print dfSmall.select('text').first()[0]

None


In [25]:
from pyspark.sql.functions import col
errors = dfSmall.filter(col('title') == '<PARSE ERROR>')
errorCount = errors.count()
print errorCount
print errorCount / float(dfSmall.count())

0
0.0


In [26]:
from pyspark.sql.functions import col
errors = dfSmall.filter(col('title') == '<PARSE ERROR>')
errorCount = errors.count()
print errorCount
print errorCount / float(dfSmall.count())

0
0.0


In [27]:
print dfSmall.filter(dfSmall['title'] == '<PARSE ERROR>').count()
print dfSmall.filter(dfSmall.title == '<PARSE ERROR>').count()

0
0


In [28]:
errors.select(col('title').alias('badTitle')).show(3)

+--------+
|badTitle|
+--------+
+--------+



In [31]:
(dfSmall
 .select(col('redirect_title').isNotNull().alias('hasRedirect'))
 .groupBy('hasRedirect')
 .count()
 .show())

+-----------+-----+
|hasRedirect|count|
+-----------+-----+
|       true|    4|
|      false|    9|
+-----------+-----+



In [32]:
filtered = dfSmall.filter((col('title') != '<PARSE ERROR>') &
                           col('redirect_title').isNull() &
                           col('text').isNotNull())
print filtered.count()

0


In [33]:
import pyspark.sql.functions as func
dir(func)

['AutoBatchedSerializer',
 'Column',
 'DataFrame',
 'PickleSerializer',
 'SparkContext',
 'StringType',
 'UserDefinedFunction',
 '__all__',
 '__builtins__',
 '__doc__',
 '__file__',
 '__name__',
 '__package__',
 '_binary_mathfunctions',
 '_create_binary_mathfunction',
 '_create_function',
 '_create_window_function',
 '_functions',
 '_functions_1_4',
 '_functions_1_6',
 '_functions_2_1',
 '_prepare_for_python_RDD',
 '_string_functions',
 '_test',
 '_to_java_column',
 '_to_seq',
 '_window_functions',
 '_wrap_function',
 'abs',
 'acos',
 'add_months',
 'approxCountDistinct',
 'approx_count_distinct',
 'array',
 'array_contains',
 'asc',
 'ascii',
 'asin',
 'atan',
 'atan2',
 'avg',
 'base64',
 'bin',
 'bitwiseNOT',
 'blacklist',
 'broadcast',
 'bround',
 'cbrt',
 'ceil',
 'coalesce',
 'col',
 'collect_list',
 'collect_set',
 'column',
 'concat',
 'concat_ws',
 'conv',
 'corr',
 'cos',
 'cosh',
 'count',
 'countDistinct',
 'covar_pop',
 'covar_samp',
 'crc32',
 'create_map',
 'cume_dist',


In [34]:
dfSmall.select('timestamp').show(5)

+--------------------+
|           timestamp|
+--------------------+
|2017-03-15T02:49:58Z|
|2017-03-14T11:27:49Z|
|2017-03-11T02:42:19Z|
|2017-01-21T22:57:26Z|
|2017-01-10T02:55:21Z|
+--------------------+
only showing top 5 rows



In [35]:
dfSmall.select('timestamp',func.date_format('timestamp','MM/dd/yyyy').alias('date')).show(5)

+--------------------+----------+
|           timestamp|      date|
+--------------------+----------+
|2017-03-15T02:49:58Z|03/14/2017|
|2017-03-14T11:27:49Z|03/14/2017|
|2017-03-11T02:42:19Z|03/10/2017|
|2017-01-21T22:57:26Z|01/21/2017|
|2017-01-10T02:55:21Z|01/09/2017|
+--------------------+----------+
only showing top 5 rows



In [40]:
withDate = dfSmall.withColumn('date', func.date_format('timestamp','MM/DD/YY'))
withDate.printSchema()
withDate.select('title','date').show(3)

root
 |-- last_contributor_username: string (nullable = true)
 |-- redirect_title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: string (nullable = true)

+-----------------+--------+
|            title|    date|
+-----------------+--------+
|    Adobe Systems|03/73/17|
|Andrzej Sapkowski|03/73/17|
| Plague (disease)|03/69/17|
+-----------------+--------+
only showing top 3 rows



In [41]:
withCEST = withDate.withColumn('cest_time', func.from_utc_timestamp('timestamp','Europe/Amsterdam'))
withCEST.printSchema()

root
 |-- last_contributor_username: string (nullable = true)
 |-- redirect_title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: string (nullable = true)
 |-- cest_time: timestamp (nullable = true)



In [43]:
withCEST.select('timestamp','cest_time').show(3,False)

+--------------------+---------------------+
|timestamp           |cest_time            |
+--------------------+---------------------+
|2017-03-15T02:49:58Z|2017-03-14 20:49:58.0|
|2017-03-14T11:27:49Z|2017-03-14 05:27:49.0|
|2017-03-11T02:42:19Z|2017-03-10 19:42:19.0|
+--------------------+---------------------+
only showing top 3 rows



In [47]:
lowerid = withCEST.select('*', func.lower(col('text')).alias('lowerText'))
print lowerid.select('lowerText').first()[0]

None


In [48]:
print lowerid.columns

['last_contributor_username', 'redirect_title', 'text', 'timestamp', 'title', 'date', 'cest_time', 'lowerText']


In [52]:
parsed = (lowerid.drop('text')
                 .drop('timestamp')
                 .drop('date')
                 .withColumnRenamed('lowerText','text'))
print parsed.columns, '\n\n'
print parsed.show(3)

['last_contributor_username', 'redirect_title', 'title', 'cest_time', 'text'] 


+-------------------------+--------------+-----------------+--------------------+----+
|last_contributor_username|redirect_title|            title|           cest_time|text|
+-------------------------+--------------+-----------------+--------------------+----+
|                     null|          null|    Adobe Systems|2017-03-14 20:49:...|null|
|                     null|          null|Andrzej Sapkowski|2017-03-14 05:27:...|null|
|           Bender the Bot|          null| Plague (disease)|2017-03-10 19:42:...|null|
+-------------------------+--------------+-----------------+--------------------+----+
only showing top 3 rows

None


In [54]:
from pyspark.ml.feature import RegexTokenizer
tokenizer = (RegexTokenizer()
             .setInputCol('text')
             .setOutputCol('words')
             .setPattern('\\W+'))
wordsDF = tokenizer.transform(parsed)

In [56]:
stopWords = set(sc.textFile('/Users/linamiao/playground/spark/stop_words.txt').collect())
print [word for i, word in zip(range(5),stopWords)]

[u'all', u'six', u'less', u'being', u'indeed']


In [59]:
import re
stopWordsBroadCast = sc.broadcast(stopWords)

def keepWord(word):
    if len(word) < 3:
        return False
    
    if word in stopWordsBroadCast.value:
        return False
    
    if re.search(re.compile(r'[0-9_]'),word):
        return False
    
    return True

def removeWords(words):
    return [word for word in words if keepWord(word)]

removeWords(['test', 'cat', 'do343', '343', 'spark', 'the', 'and', 'hy-phen', 'under_score'])

['test', 'cat', 'spark', 'hy-phen']

In [65]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

removeWordsUDF = udf(removeWords,ArrayType(StringType()))

In [63]:
noStopWords = (wordsDF
               .withColumn('noStopWords', removeWordsUDF(col('words')))
               .drop('words')
               .withColumnRenamed('noStopWords', 'words'))


In [64]:
wordList.registerTempTable('wordList')
wordGroupCount2 = sqlContext.sql('select word, count(word) as counts from wordList group by word order by counts desc')
wordGroupCount2.take(5)

NameError: name 'wordList' is not defined