In [1]:
import findspark

In [2]:
findspark.init('/home/joe/spark-2.4.3-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.master("local").appName('health').getOrCreate()

In [6]:
from pyspark.sql.types import (StructField,StringType,
                               TimestampType,LongType,
                               IntegerType,StructType)

In [7]:
data_schema = [StructField('date',StringType(),True),
              StructField('user_id',StringType(),True),
              StructField('user_name',StringType(),True),                              
              StructField('favorite_count',IntegerType(),True),
              StructField('retweet_count',IntegerType(),True),
              StructField('text',StringType(),True),
              StructField('user_location',StringType(),True)]

In [8]:
final_struc = StructType(fields=data_schema)

In [9]:
df0 = spark.read.csv('datasets/part00.csv',schema=final_struc)

In [10]:
df1 = spark.read.csv('datasets/part01.csv',schema=final_struc)

In [11]:
df2 = spark.read.csv('datasets/part02.csv',schema=final_struc)

In [12]:
df3 = spark.read.csv('datasets/part03.csv',schema=final_struc)

In [13]:
df4 = spark.read.csv('datasets/part04.csv',schema=final_struc)

In [14]:
df5 = spark.read.csv('datasets/part05.csv',schema=final_struc)

In [15]:
df6 = spark.read.csv('datasets/part06.csv',schema=final_struc)

In [16]:
df_raw=df0.union(df1).union(df2).union(df3).union(df4).union(df5).union(df6)

In [17]:
#Step 1: Missing Data:
df_raw.count()

446011

In [18]:
df_raw.select('text').distinct().count()

187565

In [19]:
df = df_raw.na.drop()

In [20]:
df.count()

295327

In [21]:
df.head(20)

[Row(date='Fri Mar 10 20:43:25 +0000 2017', user_id='49171842', user_name='mmadamimadamm', favorite_count=0, retweet_count=1, text='RT @starr_d: LOL RT @AdamParkhomenko Trumpcare https://t.co/jVQBerfCq1', user_location='Montreal, Quebec, Canada'),
 Row(date='Fri Mar 10 21:00:35 +0000 2017', user_id='2829157034', user_name='Easy Artisan Website', favorite_count=0, retweet_count=0, text='RT FCSpotlight: What innovation would you like to build? In health care, retail, mobile, entertainment — share you… https://t.co/tNgr99By7H', user_location='Toronto, Ontario'),
 Row(date='Fri Mar 10 21:44:03 +0000 2017', user_id='1419293432', user_name='Tracy Dallas', favorite_count=0, retweet_count=123, text="RT @MotherJones: Trumpcare Would Make America's Opioid Epidemic Even Worse https://t.co/XsAN04Wzmr", user_location='Vancouver, BC'),
 Row(date='Fri Mar 10 22:54:51 +0000 2017', user_id='289607643', user_name='Jordan "DELETES JULY 6" Woodward', favorite_count=0, retweet_count=0, text='Reduce governm

In [22]:
#Step 2: Formatting:
# Formatting "date" Attribute:
from pyspark.sql.functions import split
split_date = pyspark.sql.functions.split(df['date'], ' ')

In [23]:
df_split = df.withColumn('year', split_date.getItem(5).cast('int'))

In [24]:
df_split = df_split.withColumn('month', split_date.getItem(1))

In [25]:
df_split = df_split.withColumn('day', split_date.getItem(2).cast('int'))

In [26]:
df_split.show()

+--------------------+------------------+--------------------+--------------+-------------+--------------------+--------------------+----+-----+---+
|                date|           user_id|           user_name|favorite_count|retweet_count|                text|       user_location|year|month|day|
+--------------------+------------------+--------------------+--------------+-------------+--------------------+--------------------+----+-----+---+
|Fri Mar 10 20:43:...|          49171842|       mmadamimadamm|             0|            1|RT @starr_d: LOL ...|Montreal, Quebec,...|2017|  Mar| 10|
|Fri Mar 10 21:00:...|        2829157034|Easy Artisan Website|             0|            0|RT FCSpotlight: W...|    Toronto, Ontario|2017|  Mar| 10|
|Fri Mar 10 21:44:...|        1419293432|        Tracy Dallas|             0|          123|RT @MotherJones: ...|       Vancouver, BC|2017|  Mar| 10|
|Fri Mar 10 22:54:...|         289607643|Jordan "DELETES J...|             0|            0|Reduce governme

In [27]:
from pyspark.sql import functions as F
df_1 = df_split.withColumn('month_num',
    F.when(df_split['month']=='Jan',1).
                           when(df_split['month']=='Feb',2).
                           when(df_split['month']=='Mar',3).
                           when(df_split['month']=='Apr',4).
                           when(df_split['month']=='May',5).
                           when(df_split['month']=='Jun',6).
                           when(df_split['month']=='Jul',7).
                           when(df_split['month']=='Aug',8).
                           when(df_split['month']=='Sep',9).
                           when(df_split['month']=='Oct',10).
                           when(df_split['month']=='Nov',11).
                           when(df_split['month']=='Sep',12).
                           otherwise(df_split['month'])).drop('month').drop('date')

In [28]:
df_1.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month_num: string (nullable = true)



In [29]:
df_2 = df_1.withColumn('month',df_1["month_num"].cast(IntegerType()))

In [30]:
df_2.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month_num: string (nullable = true)
 |-- month: integer (nullable = true)



In [31]:
df_2 = df_2.select(['year','month','day','user_id','user_location','user_name','favorite_count','retweet_count','text'])

In [32]:
df_2.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)



In [32]:
#Step 2: Formatting:
# Formatting "user_location" Attribute:

In [33]:
test = df_2.filter(df['user_location'].rlike('Newfoundland and Labrador')|df['user_location'].rlike('NL'))

In [34]:
test.count()

3399

In [35]:
test.show()

+----+-----+---+----------+--------------------+-----------------+--------------+-------------+--------------------+
|year|month|day|   user_id|       user_location|        user_name|favorite_count|retweet_count|                text|
+----+-----+---+----------+--------------------+-----------------+--------------+-------------+--------------------+
|2017|    3| 13|  20446536|Newfoundland and ...|    Terry Cormier|             0|         3024|RT @timkaine: POT...|
|2017|    6| 14|  25493262|Stephenville Cros...|    D'Arcy Butler|             0|            0|@VOCMOpenline sur...|
|2017|    6| 20| 280082481|Newfoundland and ...|     Janis Sexton|             0|         3846|RT @mattmfm: Back...|
|2017|    6| 20| 280082481|Newfoundland and ...|     Janis Sexton|             0|         5670|RT @ChrisMurphyCT...|
|2017|    3| 12| 280082481|Newfoundland and ...|     Janis Sexton|             0|          812|RT @docrocktex26:...|
|2017|    3| 14| 280082481|Newfoundland and ...|     Janis Sexto

In [36]:
from pyspark.sql import functions as F
df_3 = df_2.withColumn('user_location_ab',
                       F.when(df_2.user_location.rlike('Newfoundland and Labrador')|df_2.user_location.rlike('NL'),'NL').
                       when(df_2.user_location.rlike('Alberta')|df_2.user_location.rlike('AB'),'AB').
                       when(df_2.user_location.rlike('Saskatchewan')|df_2.user_location.rlike('SK'),'SK').
                       when(df_2.user_location.rlike('Prince Edward Island')|df_2.user_location.rlike('PE'),'PE').
                       when(df_2.user_location.rlike('British Columbia')|df_2.user_location.rlike('BC'),'BC').
                       when(df_2.user_location.rlike('Manitoba')|df_2.user_location.rlike('MB'),'MB').
                       when(df_2.user_location.rlike('New Brunswick')|df_2.user_location.rlike('NB'),'NB').
                       when(df_2.user_location.rlike('Nova Scotia')|df_2.user_location.rlike('NS'),'NS').
                       when(df_2.user_location.rlike('Quebec')|df_2.user_location.rlike('QC'),'QC').
                       when(df_2.user_location.rlike('Ontario')|df_2.user_location.rlike('ON'),'ON').
                       otherwise(None)).drop('user_location').drop('user_id')

In [37]:
df_3.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- user_name: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- user_location_ab: string (nullable = true)



In [38]:
df_3.describe().show()

+-------+--------------------+------------------+------------------+--------------------+------------------+------------------+--------------------+----------------+
|summary|                year|             month|               day|           user_name|    favorite_count|     retweet_count|                text|user_location_ab|
+-------+--------------------+------------------+------------------+--------------------+------------------+------------------+--------------------+----------------+
|  count|              295327|            295327|            295327|              295327|            295327|            295327|              295327|          295299|
|   mean|              2017.0| 7.327633436834424|16.227832199561842|1.5043132632916667E9|0.9724847372573453| 5002.326438151608|                null|            null|
| stddev|1.802240111135220...|1.8180823835140105| 7.756094624296451|3.4359974325219607E9|163.20630493463077|15030.196391165475|                null|            null|
|   

In [39]:
df_3.head()

Row(year=2017, month=3, day=10, user_name='mmadamimadamm', favorite_count=0, retweet_count=1, text='RT @starr_d: LOL RT @AdamParkhomenko Trumpcare https://t.co/jVQBerfCq1', user_location_ab='QC')

In [40]:
df_3.filter(df_3['user_location_ab'] == 'NL').count()

3399

In [41]:
df_3.count()

295327

In [42]:
df_3.select('text','user_name').distinct().count()

290363

In [43]:
df_3.select('month','year').distinct().show()

+-----+----+
|month|year|
+-----+----+
|    3|2017|
|    6|2017|
|    7|2017|
|    8|2017|
|    9|2017|
|    5|2017|
|   10|2017|
+-----+----+



In [44]:
#Step 4: Drop Duplicates:

df_3.dropDuplicates().count()

294556

In [45]:
df_4=df_3.dropDuplicates()

In [67]:
#Step 5: Adding a label based on sentiment analysis about 'text':
# Step 5.1 add some infor about 'text'

In [46]:
import pandas as pd
import numpy as np

In [47]:
pandas_df = df_4.toPandas()

In [48]:
pandas_df.head()

Unnamed: 0,year,month,day,user_name,favorite_count,retweet_count,text,user_location_ab
0,2017,3,11,ADub,0,16427,RT @GeorgeTakei: The GOP wants to ensure the m...,SK
1,2017,6,12,Annapolis NDP,1,0,Urgent-health care planning must be based on a...,NS
2,2017,6,21,Lethbridge living,1,0,This research project is focused on making pri...,AB
3,2017,6,14,THĒ RÅÏŃ MÅÑ ☔️,0,40988,RT @SenSanders: BREAKING: Senate Republicans j...,AB
4,2017,6,8,Steven Holland,0,27,RT @AndreaHorwath: Simply appalling. We have t...,ON


In [49]:
type(pandas_df['text'])

pandas.core.series.Series

In [50]:
pandas_df['word_count']=pandas_df['text'].apply(lambda x:len(x.split()))

In [51]:
pandas_df['char_count']=pandas_df['text'].apply(lambda x:len(x))

In [52]:
def average_words(x):
    words = x.split()
    return sum(len(word) for word in words) / len(words)

In [53]:
pandas_df['average_word_len'] = pandas_df['text'].apply(lambda x: average_words(x))

In [54]:
pandas_df.head()

Unnamed: 0,year,month,day,user_name,favorite_count,retweet_count,text,user_location_ab,word_count,char_count,average_word_len
0,2017,3,11,ADub,0,16427,RT @GeorgeTakei: The GOP wants to ensure the m...,SK,25,139,4.6
1,2017,6,12,Annapolis NDP,1,0,Urgent-health care planning must be based on a...,NS,16,115,6.25
2,2017,6,21,Lethbridge living,1,0,This research project is focused on making pri...,AB,13,107,7.307692
3,2017,6,14,THĒ RÅÏŃ MÅÑ ☔️,0,40988,RT @SenSanders: BREAKING: Senate Republicans j...,AB,19,140,6.421053
4,2017,6,8,Steven Holland,0,27,RT @AndreaHorwath: Simply appalling. We have t...,ON,27,148,4.518519


In [None]:
# Step 5.2 Import textblob to analysis 

In [55]:
pandas_df.to_csv('pandas_df.csv', encoding='utf-8', index=False)