In [1]:
from __future__ import print_function
%matplotlib inline
import matplotlib.pylab as plt
import sys, os, glob
import numpy as np
import subprocess

from ipywidgets import interact, interactive, fixed
import ipywidgets as widgets

from IPython.display import HTML
import xml.etree.ElementTree as ET
try:
    tree = ET.parse(os.environ['HADOOP_CONF_DIR'] + '/yarn-site.xml')
except IOError:
    raise IOError("Can't find the yarn configuration -- is HADOOP_CONF_DIR set?")
root = tree.getroot()
yarn_web_app = root.findall("./property[name='yarn.resourcemanager.webapp.address']")[0].find('value').text
yarn_web_app_string = "If this works successfully, you can check the <a target='_blank' href='http://{yarn_web_app}'>YARN application scheduler</a> and you should see your app listed there. Clicking on the 'Application Master' link will bring up the familiar Spark Web UI. "

plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

## Set up and launch the Spark runtime

Remember from the previous notebook that we have a saved configuration in `./spark_config/` -- so all we need to do is set the `SPARK_CONF_DIR` environment variable and our default configuration will be used: 

In [2]:
# specify the configuration directory
os.environ['SPARK_CONF_DIR'] = os.path.realpath('../../spark_workshop/notebooks/twitter_dataframes/spark_config')

# how many cores do we have for the driver
ncores = int(os.environ.get('LSB_DJOB_NUMPROC', 1)) 

# here we set the memory we want spark to use for the driver JVM
os.environ['SPARK_DRIVER_MEMORY'] = '%dG'%(ncores*0.7)

# we have to tell spark which python executable we are using
os.environ['PYSPARK_PYTHON'] = subprocess.check_output('which python', shell=True).rstrip()

import findspark
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext

conf = SparkConf()

sc = SparkContext(master='yarn-client', conf=conf)

## Initialization

Using `DataFrame`s, our entry point into the Spark universe is the `SQLContext` or the equivalent `HiveContext`:

In [3]:
from pyspark.sql import SQLContext, HiveContext

hc = HiveContext(sc)

We'll load the data off the disk, but only for the last three months of the year. 

In [4]:
%%time
# data = hc.read.parquet('/user/roskarr/twitter/2014_1*')
data = hc.read.parquet('/user/roskarr/twitter/2014_12*')

CPU times: user 7 ms, sys: 3 ms, total: 10 ms
Wall time: 29.3 s


`data` is now a `DataFrame` object, which is essentially a collection of `Row` objects. Each `Row` object contains data for whatever columns are defined in the `DataFrame`. Here is a critical difference between `DataFrames` and `RDD`s: each column has an associated data type. While in dealing with an `RDD` we relied on Python to convert types, here each column has a specified data type. This means that:

a) we have to be a bit more careful about what we are doing and 

b) the execution engine can optimize our calculations because the data is no longer a black box. 

### Importing libraries and types for `DataFrame` API

The next few cells are a bit lengthy and complicated so you can just treat them as black boxes for now. First, we just import some necessary libraries and classes, then a series of functions are defined and executed on the `data` to give us a `DataFrame` of hashtags. 

In [5]:
import pyspark.sql.functions as func
from pyspark.sql import Row, Window
from pyspark.sql.types import IntegerType, ArrayType, StringType, NullType, LongType, StructField, StructType, DateType, DataType, DateConverter, DatetimeConverter, TimestampType, BooleanType
import datetime

## read in keywords

In [6]:
def get_keys(path='../docs/keywords.txt'):
    keys = np.genfromtxt(path, dtype="|S20", delimiter='#', autostrip=True)
    return keys.tolist()

In [7]:
keys = get_keys()
print(type(keys))

<type 'list'>


In [8]:
print(keys)

['isis', 'terrorism', 'arab', 'spring', 'attack', 'god', 'christian', 'allah', 'islam', 'syria', 'refugees', 'migrants', 'africa', 'italy', 'ethiopia', 'asylum', 'unhcr', 'immigration', 'foreigners', 'crowded', 'ebola', 'guinea', 'sierra', 'leone', 'liberia', 'virus', 'epidemic', 'vaccine', 'who', 'influenza', 'flu', 'birds', 'swine', 'pig', 'bitcoin', 'rosetta', 'comet', 'higgs', 'climate', 'doomsday', 'maya', 'curiosity', 'sandy', 'hurricane', 'black', 'white', 'mandela', 'nelson', 'left', 'right', 'mh17', 'mh370', 'ukraine', 'crimea', 'russia', 'snowden', 'nsa', 'obama', 'putin', 'pope', 'unemployment', 'boston', 'marathon', 'london', 'europe', 'usa', 'philippines', 'sochi', 'olympics', 'geneva', 'apple', 'linux', 'PC', 'google', 'iphone', 'galaxy', 'watch', 'facebook', 'twitter', 'whatsapp', 'vegan', 'gluten', 'vegetarian', 'meat', 'pasta', 'banana', 'family', 'divorce', 'marriage', 'wedding', 'holidays', 'homework', 'television', 'coffee', 'tea', 'school', 'work', 'teacher', 'spor

### Custom functions to extract hashtags

Change the date format to YYYY-MM-DD HH:MM:SS

In [9]:
convert_date_string = func.udf(lambda date_string: \
                               datetime.date.strftime(datetime.datetime.strptime(date_string, \
                                                                                 '%a %b %d %H:%M:%S +0000 %Y'),\
                                                      '%Y-%m-%d %H:%M:%S'), StringType())

Convert the date string to a datetime object

In [10]:
datetime_udf = func.udf(lambda date_string: datetime.strptime(date_string, '%a %b %d %H:%M:%S +0000 %Y'), DateType())

Extract the hashtags from a hashtag array and convert them all to lowercase

In [11]:
hash_text_udf = func.udf(lambda row: [r.text.lower() for r in row], returnType=ArrayType(StringType()))

Convert the tweet text string to lowercase

In [12]:
key_udf_lower = func.udf(lambda row: row.lower(), returnType=StringType())

Split the text of the tweet

In [13]:
key_udf_split = func.udf(lambda row: row.split(), returnType=ArrayType(StringType()))

Combine the year of creation of the tweet and the day of creation to a unique date ID.

In [14]:
new_date = (lambda col: func.dayofyear(col) + func.year(col)*1000)

In [15]:
key_udf_key = func.udf(lambda row: len([val for val in row if val in keys]) > 0, returnType=BooleanType())
# key_udf_key = func.udf(lambda row: keys[0] in row, returnType=BooleanType())

### Apply the functions to the data frame

Define the number of partitions for the data

In [16]:
Npartitions = sc.defaultParallelism*200
print(Npartitions)

16000


Create the hashtag dataframe with all the tweets with more than one hashtag, extract the hashtags and also extract the creation date and convert it to string.

In [17]:
# only keep the tweets with at least one hashtag
hashtag_df = (data.select('created_at', 'text')
              .filter(func.length('text') > 0)
              .withColumn('created_at', convert_date_string('created_at'))
              .withColumn('created_at', new_date('created_at'))
              .withColumn('text', key_udf_lower('text'))
              .withColumn('text', key_udf_split('text'))
              .repartition(Npartitions)
              .filter(key_udf_key('text')))
#               .repartition(Npartitions))
hashtag_df.cache()

DataFrame[created_at: int, text: array<string>]

In [18]:
hashtag_df.show(10, truncate=False)

+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|created_at|text                                                                                                                                                                     |
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2014338   |[rt, @lashanekayla:, once, you, stop, chasing, the, wrong, things,, the, right, ones, catch, you..]                                                                      |
|2014338   |[rt, @rbcreamer:, yoho, immigration, bill, cements, gop, position, favoring, mass, deportation, of, immigrants., #immigrantaction]                                       |
|2014335   |[rt, @binkylondon:, just, 3, hours, left, to, get, 25%, off, everything, 

In [48]:
filter_func = func.udf(lambda row: len([val for val in row if val == 'apple']) > 0, returnType=BooleanType())
count_func = func.udf(lambda row: 1, returnType=IntegerType())

temp_dt = (hashtag_df.repartition(Npartitions)
           .filter(filter_func('text'))
           )
temp_dt.show(10)
group_date = temp_dt.groupBy('created_at')

+----------+--------------------+
|created_at|                text|
+----------+--------------------+
|   2014335|[rt, @mobi4all_en...|
|   2014337|[apple, :, un, br...|
|   2014338|[apple, a, suppri...|
|   2014337|[apple/android?, ...|
|   2014338|[kolumuzda, hiç, ...|
|   2014337|[rt, @appieoffici...|
|   2014336|[apple, macbook, ...|
|   2014339|[rt, @nrinsyfqh_:...|
|   2014335|[google, chromebo...|
|   2014335|[#apple, #iphone6...|
+----------+--------------------+
only showing top 10 rows



In [49]:
temp_dt.show(10, truncate=False)

+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|created_at|text                                                                                                                                                           |
+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2014335   |[rt, @mobi4all_en:, five, reasons, why, you, should, buy, apple, iphone, 6, plus, -, http://t.co/dn6hvtore1, #iphone6plus, http://t.co/e7tayrarni]             |
|2014337   |[apple, :, un, brevet, pour, en, finir, avec, les, écrans, d'iphone, brisés, ?, http://t.co/u7lr1hbdkp, via, zdnet]                                            |
|2014338   |[apple, a, supprimé, des, chansons, d'ipods, sans, en, avertir, leurs, utilisateurs, -, rts.ch, -, info, -, sciences-tech.,

In [50]:
group_df = group_date.count()
group_df.show(10)

+----------+-----+
|created_at|count|
+----------+-----+
|   2014335| 1606|
|   2014336| 2109|
|   2014337| 1984|
|   2014338| 2163|
|   2014339| 2001|
|   2014340| 1773|
|   2014341|  206|
+----------+-----+



In [25]:
# group_df.show(10)
# group_df.coalesce(1).write.format('com.databricks.spark.csv').save('../data/obama.csv')
# group_df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile('/cluster/home03/phys/anweigel/spuriousrelations/data/obama.csv')

In [33]:
# group_df.rdd.collect()

[Row(created_at=2014335, count=1471),
 Row(created_at=2014336, count=1970),
 Row(created_at=2014337, count=1660),
 Row(created_at=2014338, count=2237),
 Row(created_at=2014339, count=1883),
 Row(created_at=2014340, count=1263),
 Row(created_at=2014341, count=164)]

In [36]:
# group_df.rdd.map(lambda x: ",".join(map(str, x))).collect().saveAsTextFile('/cluster/home03/phys/anweigel/spuriousrelations/data/obama.csv')

['2014335,1471',
 '2014336,1970',
 '2014337,1660',
 '2014338,2237',
 '2014339,1883',
 '2014340,1263',
 '2014341,164']

In [None]:
brrr = np.array(group_df.rdd.map(lambda x: ",".join(map(str, x))).collect())

In [None]:
np.savetxt('/cluster/home03/phys/anweigel/spuriousrelations/data/apple.csv', brrr, fmt='%s')

In [30]:
# group_df.rdd.map(lambda x: ",".join(map(str, x))).foreach(print)  #map(lambda x: ",".join(map(str, x))).coalesce(1)#.foreach(print)

In [24]:
# group_df.show(1)

+----------+-----+
|created_at|count|
+----------+-----+
|   2014335| 1471|
+----------+-----+
only showing top 1 row



In [None]:
for key in keys:
    filter_func = func.udf(lambda row: len([val for val in row if val == key]) > 0, returnType=BooleanType())
    temp_dt = (hashtag_df.repartition(Npartitions)
               .filter(filter_func('text'))
              )
    group_date = temp_dt.groupBy('created_at')
    group_df = group_date.count()
    arr = np.array(group_df.rdd.map(lambda x: ",".join(map(str, x))).collect())
    np.savetxt('/cluster/home03/phys/anweigel/spuriousrelations/data/' + key + '.csv',
               arr, fmt='%s')
       

Convert the tweet text to lowercase, split it into a word array and add this into a new column called test. Then check if any of the elements of keys is in this list and add this information in the column intext.

In [None]:
def testfunc(row, keys):
    result_tmp = [True for val in keys]
#     result_tmp = np.array(result_tmp, dtype=np.int)    
    return result_tmp

In [None]:
keys = ['@tom_wilso:', 'rt']
# key_udf_key = func.udf(lambda row: len([val for val in keys if val in row]) > 0, returnType=BooleanType())
key_udf_global3 = func.udf(lambda row: [int(val in row) for val in keys], returnType=ArrayType(IntegerType()))

In [None]:
hashtag_df = hashtag_df.withColumn('test', key_udf_lower('text'))
hashtag_df = hashtag_df.withColumn('test', key_udf_split('test'))
hashtag_df.show(20)

In [None]:
keys = ['@oridiva:', 'rt']
# key_udf_key = func.udf(lambda row: len([val for val in keys if val in row]) > 0, returnType=BooleanType())
key_udf_global3 = func.udf(lambda row: [int(val in row) for val in keys], returnType=ArrayType(IntegerType()))
key_udf_global4 = func.udf(lambda row: int('@oridiva:' in row), returnType=IntegerType())
key_udf_global5 = func.udf(lambda row: int('rt' in row), returnType=IntegerType())

hashtag_df5 = (hashtag_df.withColumn('@oridiva:', key_udf_global4('test'))
                         .withColumn('rt', key_udf_global5('test')))

In [None]:
hashtag_df5.show(10)

In [None]:
hashtag_df2 = hashtag_df.withColumn('intext', key_udf_global3('test'))
hashtag_df2.show(10)

Compute the day ID of the tweet and add it to the column new_date

In [None]:
hashtag_df3 = hashtag_df5.withColumn('new_date', new_date('created_at'))

In [None]:
hashtag_df3.show(10)

In [None]:
hashtag_df4 = hashtag_df3.select('new_date', '@oridiva:', 'rt')

In [None]:
hashtag_df4.show(20)

In [None]:
hashtag_df4_group = hashtag_df4.groupBy('new_date')

In [None]:
hashtag_df4_group.sum('@oridiva:').collect()

Questions:
* What does the text method do?
* Why does the new_date function work even though it is not an UDF?

In [None]:
sc.stop()

In [42]:
!ls /user/roskarr/twitter/

ls: cannot access /user/roskarr/twitter/: No such file or directory


In [47]:
os.environ

{'LSB_MAX_NUM_PROCESSORS': '200', 'LSF_EAUTH_AUX_PASS': 'yes', 'TMOUT': '86400', '_JAVA_OPTIONS': '-Xmx1024m -Xms256m -XX:ParallelGCThreads=1', 'BSUB_BLOCK_EXEC_HOST': '', 'SHELL': '/bin/bash', 'LSB_DJOB_HB_INTERVAL': '62', 'LSB_UNIXGROUP_INT': 'phys', 'HISTSIZE': '1000', 'LS_EXEC_T': 'START', 'MANPATH': '/cluster/apps/java/jdk1.7.0_03/man:/cluster/apps/modules/share/man:/cluster/apps/lsf/8.0/man:', 'LSB_EEXEC_REAL_GID': '', 'JAVA_HOME': '/cluster/apps/java/jdk1.7.0_03', 'JPY_PARENT_PID': '7581', 'LSF_INVOKE_CMD': 'bsub', 'HADOOP_CONF_DIR': '/cluster/apps/hadoop/hadoop-2.6.0/etc/hadoop', 'LSB_JOBINDEX': '0', 'LS_JOBPID': '7424', 'LSB_JOBID': '16275679', 'HOSTNAME': 'a6264', 'MCR_CACHE_ROOT': '/scratch/matlab_mcr_cache_root_69413', 'LSB_JOBRES_PID': '7424', 'MAIL': '/var/spool/mail/anweigel', 'LS_COLORS': 'rs=0:di=38;5;27:ln=38;5;51:mh=44;38;5;15:pi=40;38;5;11:so=38;5;13:do=38;5;5:bd=48;5;232;38;5;11:cd=48;5;232;38;5;3:or=48;5;232;38;5;9:mi=05;48;5;232;38;5;15:su=48;5;196;38;5;15:sg=48;