In [10]:
# going to dev json to sql with spark using local json file. Then implement on ec2 instance for pipeining. 
import pyspark as ps
import pandas as pd
from pyspark.sql.functions import lit

In [2]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )
sc = spark.sparkContext

In [3]:
# loaded test df (COVID only CO)
df = spark.read.format('json').load("../zip_data/data/test.json")

In [4]:
df.columns

['contributors',
 'coordinates',
 'created_at',
 'display_text_range',
 'entities',
 'extended_entities',
 'extended_tweet',
 'favorite_count',
 'favorited',
 'filter_level',
 'geo',
 'id',
 'id_str',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id',
 'in_reply_to_user_id_str',
 'is_quote_status',
 'lang',
 'limit',
 'place',
 'possibly_sensitive',
 'quote_count',
 'quoted_status',
 'quoted_status_id',
 'quoted_status_id_str',
 'quoted_status_permalink',
 'reply_count',
 'retweet_count',
 'retweeted',
 'retweeted_status',
 'source',
 'text',
 'timestamp_ms',
 'truncated',
 'user',
 'withheld_in_countries']

In [5]:
# subsect colums that I want for 'tweets' table
tweet_col_lst = ['id','user','lang','entities','place','created_at','text','source']
tweet_col_df = df[tweet_col_lst]

In [6]:
tweet_col_df.show(1)

+-------------------+--------------------+----+--------------------+-----+--------------------+--------------------+--------------------+
|                 id|                user|lang|            entities|place|          created_at|                text|              source|
+-------------------+--------------------+----+--------------------+-----+--------------------+--------------------+--------------------+
|1245439863907852289|[false, Mon Feb 2...|  en|[[],, [], [], [[1...| null|Wed Apr 01 19:56:...|RT @ChristinePolo...|<a href="http://t...|
+-------------------+--------------------+----+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 1 row



In [7]:
tweet_col_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- contributors_enabled: boolean (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- default_profile: boolean (nullable = true)
 |    |-- default_profile_image: boolean (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- favourites_count: long (nullable = true)
 |    |-- follow_request_sent: string (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- following: string (nullable = true)
 |    |-- friends_count: long (nullable = true)
 |    |-- geo_enabled: boolean (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- id_str: string (nullable = true)
 |    |-- is_translator: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- listed_count: long (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- notifications: string (nullable = true)
 |    |

In [8]:
tweet_col_df.cache()
tweet_col_df.count()

5005

In [98]:
tweet_col_df=tweet_col_df.withColumn('state',lit('co'))
tweet_col_df=tweet_col_df.withColumn('search_term_key',lit(1))
tweet_col_df['state']

Column<b'state'>

In [99]:
tweet_col_df.createOrReplaceTempView('tweets')

In [101]:

place = spark.sql('''
    SELECT
        place.`full_name` AS geo_name,
        state,
        search_term
    FROM
        tweets
    WHERE
        (place.`full_name` IS NOT NULL AND
        lang = 'en')
    ''')
print(type(place))
place.show(20)

<class 'pyspark.sql.dataframe.DataFrame'>
+------------------+-----+-----------+
|          geo_name|state|search_term|
+------------------+-----+-----------+
|   Los Angeles, CA|   co|   #COVID19|
|        Denver, CO|   co|   #COVID19|
|          Cambodia|   co|   #COVID19|
|        Denver, CO|   co|   #COVID19|
|    Birmingham, AL|   co|   #COVID19|
|  Fort Collins, CO|   co|   #COVID19|
|        Denver, CO|   co|   #COVID19|
|    Enterprise, NV|   co|   #COVID19|
|     Oklahoma, USA|   co|   #COVID19|
|       Boulder, CO|   co|   #COVID19|
|     Oklahoma, USA|   co|   #COVID19|
|   Fort Carson, CO|   co|   #COVID19|
|Sandra Reyes D.D.S|   co|   #COVID19|
|     Kentucky, USA|   co|   #COVID19|
|     Englewood, CO|   co|   #COVID19|
| New Braunfels, TX|   co|   #COVID19|
|     Daly City, CA|   co|   #COVID19|
|      Uyo, Nigeria|   co|   #COVID19|
|      Lakewood, CO|   co|   #COVID19|
|   Kampala, Uganda|   co|   #COVID19|
+------------------+-----+-----------+
only showing top 20 ro

In [79]:
place.printSchema()

root
 |-- geo_name: string (nullable = true)
 |-- hash_tag_1: string (nullable = true)
 |-- hash_tag_2: string (nullable = true)
 |-- hash_tag_3: string (nullable = true)
 |-- hash_tag_4: string (nullable = true)
 |-- hash_tags_5: string (nullable = true)
 |-- hash_tag_6: string (nullable = true)
 |-- hash_tag_7: string (nullable = true)
 |-- hash_tag_8: string (nullable = true)
 |-- hash_tag_9: string (nullable = true)
 |-- hash_tags_10: string (nullable = true)
 |-- user_mentions: string (nullable = true)



In [84]:
#perform SQL queries to make the tables I want, first tweets table

temp = tweet_col_df.createOrReplaceTempView('tweets')

tweet_tbl = spark.sql('''
    SELECT 
        id AS tweet_id,
        state,
        search_term_key,
        user.`id` AS user_id,
        created_at AS time_created,
        text AS tweet_text,
        source,
        place.`full_name` AS geo_name,
        place.`id` AS geo_id,
        place.`bounding_box`.`coordinates`[0][0][1] AS geo_coords_SW_lat,
        place.`bounding_box`.`coordinates`[0][0][0] AS geo_coords_SW_long,
        place.`bounding_box`.`coordinates`[0][2][1] AS geo_coords_NE_lat,
        place.`bounding_box`.`coordinates`[0][2][0] AS geo_coords_NE_long,
        entities.`hashtags`[0].`text` AS hash_tag_1,
        entities.`hashtags`[1].`text` AS hash_tag_2,
        entities.`hashtags`[2].`text` AS hash_tag_3,
        entities.`hashtags`[3].`text` AS hash_tag_4,
        entities.`hashtags`[4].`text` AS hash_tags_5,
        entities.`user_mentions`[0].`screen_name` AS user_mentions_1,
        entities.`user_mentions`[1].`screen_name` AS user_mentions_2,
        entities.`user_mentions`[2].`screen_name` AS user_mentions_3,
        entities.`user_mentions`[3].`screen_name` AS user_mentions_4,
        entities.`user_mentions`[4].`screen_name` AS user_mentions_5,
        user.`created_at` AS user_date_created,
        user.`location` AS location,
        user.`description` AS description
    FROM tweets
    WHERE
        lang = 'en'
    ''')


In [88]:
spark_schema = tweet_tbl.schema
tweet_tbl.printSchema()

root
 |-- tweet_id: long (nullable = true)
 |-- state: string (nullable = false)
 |-- user_id: long (nullable = true)
 |-- time_created: string (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- source: string (nullable = true)
 |-- geo_name: string (nullable = true)
 |-- geo_id: string (nullable = true)
 |-- geo_coords_SW_lat: double (nullable = true)
 |-- geo_coords_SW_long: double (nullable = true)
 |-- geo_coords_NE_lat: double (nullable = true)
 |-- geo_coords_NE_long: double (nullable = true)
 |-- hash_tag_1: string (nullable = true)
 |-- hash_tag_2: string (nullable = true)
 |-- hash_tag_3: string (nullable = true)
 |-- hash_tag_4: string (nullable = true)
 |-- hash_tags_5: string (nullable = true)
 |-- hash_tags_6: string (nullable = true)
 |-- hash_tags_7: string (nullable = true)
 |-- user_mentions_1: string (nullable = true)
 |-- user_mentions_2: string (nullable = true)
 |-- user_mentions_3: string (nullable = true)
 |-- user_mentions_4: string (nullable = tru

In [86]:
pd_tweets = tweet_tbl.toPandas()

In [87]:
pd_tweets.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2767 entries, 0 to 2766
Data columns (total 29 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   tweet_id            2767 non-null   int64  
 1   state               2767 non-null   object 
 2   user_id             2767 non-null   int64  
 3   time_created        2767 non-null   object 
 4   tweet_text          2767 non-null   object 
 5   source              2767 non-null   object 
 6   geo_name            69 non-null     object 
 7   geo_id              69 non-null     object 
 8   geo_coords_SW_lat   69 non-null     float64
 9   geo_coords_SW_long  69 non-null     float64
 10  geo_coords_NE_lat   69 non-null     float64
 11  geo_coords_NE_long  69 non-null     float64
 12  hash_tag_1          1564 non-null   object 
 13  hash_tag_2          579 non-null    object 
 14  hash_tag_3          219 non-null    object 
 15  hash_tag_4          107 non-null    object 
 16  hash_t

In [25]:
tweet_date = pd_tweets['time_created']
tweet_date[0]

'Wed Apr 01 19:56:17 +0000 2020'

In [26]:
pd.to_datetime(tweet_date[0])

Timestamp('2020-04-01 19:56:17+0000', tz='UTC')

In [32]:
tweet_date_formated = tweet_date.map(lambda x: pd.to_datetime(x))

In [36]:
pd_tweets['state'].astype('string')

0       co
1       co
2       co
3       co
4       co
        ..
2762    co
2763    co
2764    co
2765    co
2766    co
Name: state, Length: 2767, dtype: string

In [37]:
pd_tweets.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2767 entries, 0 to 2766
Data columns (total 15 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   tweet_id           2767 non-null   int64 
 1   state              2767 non-null   object
 2   user_id            2767 non-null   int64 
 3   time_created       2767 non-null   object
 4   tweet_text         2767 non-null   object
 5   source             2767 non-null   object
 6   geo_name           69 non-null     object
 7   geo_id             69 non-null     object
 8   geo_coords         69 non-null     object
 9   geo_coords_two     69 non-null     object
 10  hash_tags          1564 non-null   object
 11  user_mentions      2343 non-null   object
 12  user_date_created  2767 non-null   object
 13  location           1984 non-null   object
 14  description        2342 non-null   object
dtypes: int64(2), object(13)
memory usage: 324.4+ KB


In [43]:
pd_tweets['geo_coords'][pd_tweets['geo_coords'].isna() == False]

34      [-118.668404, 33.704538]
35      [-105.109815, 39.614151]
68         [102.333681, 9.90553]
166     [-105.109815, 39.614151]
191      [-86.967333, 33.405025]
                  ...           
2615     [-104.96747, 39.783752]
2637      [28.861757, -2.838727]
2647       [-1.14452, 52.569949]
2715      [34.266924, 29.393973]
2754     [16.447593, -34.834247]
Name: geo_coords, Length: 69, dtype: object

In [45]:
print(pd_tweets['geo_coords'][34], pd_tweets['geo_coords_two'][34])

[-118.668404, 33.704538] [-118.668404, 34.337041]


In [46]:
pd_tweets['geo_name'][34]

'Los Angeles, CA'

In [103]:
pd_tweets.head(5)

Unnamed: 0,tweet_id,state,user_id,time_created,tweet_text,source,geo_name,geo_id,geo_coords_SW_lat,geo_coords_SW_long,...,user_mentions_1,user_mentions_2,user_mentions_3,user_mentions_4,user_mentions_5,user_mentions_6,user_mentions_7,user_date_created,location,description
0,1245439863907852289,co,968218684702560261,Wed Apr 01 19:56:17 +0000 2020,RT @ChristinePolon1: I just called this hospit...,"<a href=""http://twitter.com/download/android"" ...",,,,,...,ChristinePolon1,,,,,,,Mon Feb 26 20:18:17 +0000 2018,"Michigan, USA",#Truth #MAGA #KAG #Q #QAnon #GreatAwakening #...
1,1245439864130191362,co,41109216,Wed Apr 01 19:56:17 +0000 2020,RT @socialistworker: Hungary’s government effe...,"<a href=""http://twitter.com/download/android"" ...",,,,,...,socialistworker,,,,,,,Tue May 19 12:56:52 +0000 2009,London UK,I AM A CITIZEN OF THE WORLD 🌍\nI am a Globalis...
2,1245439864197218304,co,550338885,Wed Apr 01 19:56:17 +0000 2020,"RT @JustinBrannan: Dear @Amazon,\n\nWith peopl...","<a href=""https://mobile.twitter.com"" rel=""nofo...",,,,,...,JustinBrannan,amazon,,,,,,Tue Apr 10 16:42:11 +0000 2012,,I don’t know her.
3,1245439864331436032,co,1149028897247416320,Wed Apr 01 19:56:17 +0000 2020,UV DISINFECTION - UV LIGHT AND COVID-19 (CORON...,"<a href=""http://twitter.com/download/iphone"" r...",,,,,...,,,,,,,,Wed Jul 10 18:53:31 +0000 2019,"Texas, USA","BSN RN OR/CVOR/IR/GI. Jesus is #1. 🚫DM, 🚫chats..."
4,1245439864943906817,co,1146317592556122112,Wed Apr 01 19:56:17 +0000 2020,RT @KemiOlunloyo: #BREAKING I am calling for ...,"<a href=""http://twitter.com/download/android"" ...",,,,,...,KemiOlunloyo,HE_HopeUzodimma,NCDCgov,Chikwe_I,,,,Wed Jul 03 07:19:46 +0000 2019,,A bio can't explain ME


In [105]:
tweet_text = pd_tweets[['tweet_text', 'description']]

In [106]:
tweet_text['pre_clean_length_text'] = [len(t) for t in tweet_text['tweet_text']]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [108]:
tweet_text.head(5)

Unnamed: 0,tweet_text,description,pre_clean_length_text
0,RT @ChristinePolon1: I just called this hospit...,#Truth #MAGA #KAG #Q #QAnon #GreatAwakening #...,140
1,RT @socialistworker: Hungary’s government effe...,I AM A CITIZEN OF THE WORLD 🌍\nI am a Globalis...,139
2,"RT @JustinBrannan: Dear @Amazon,\n\nWith peopl...",I don’t know her.,140
3,UV DISINFECTION - UV LIGHT AND COVID-19 (CORON...,"BSN RN OR/CVOR/IR/GI. Jesus is #1. 🚫DM, 🚫chats...",129
4,RT @KemiOlunloyo: #BREAKING I am calling for ...,A bio can't explain ME,140


In [109]:
tweet_text[tweet_text.pre_clean_length_text > 140]

Unnamed: 0,tweet_text,description,pre_clean_length_text
15,RT @RepJimBanks: Heartbreaking story about Ai ...,,144
20,RT @Davidshal20: @pulte Due to #COVID19 known ...,Account parodia,148
29,RT @codepink: The people of #Iran are suffocat...,"writer, teacher, mom, cultural and political a...",144
51,RT @jazzzy_10: Due to #COVID19 aka #Coronaviru...,you have to be odd to be number one ~Dr. seuss,144
59,RT @PIB_India: Actor @Suriya_offl appeals peop...,@samsung SEC,144
...,...,...,...
2706,RT @PHLJazzProject: Just as I was putting toge...,“Philadelphia’s most reliable and ambitious pr...,144
2707,RT @TrinityResists: Trump &amp; his regime out...,"Progressive, Mom, Middle Class, Passionate Abo...",152
2713,RT @USAIDEthiopia: To support #COVID19 respons...,"Lecturer @ AAU, CHS, SOM\n\nInternational Medi...",144
2718,RT @MichaelMaidment: Hey #Ottawa. When you dec...,"Director Comms & Gov. Relations @UNHCRCanada, ...",143


In [129]:
tweet_text.tweet_text[15]

'RT @RepJimBanks: Heartbreaking story about Ai Fen, a Chinese doc who raised alarm about #COVID19 in early Dec &amp; is now missing.\n\n@StateDept…'

In [126]:
import numpy as np
mask = np.column_stack([tweet_text[col].str.contains(r"\&", na=False) for col in tweet_text['tweet_text']
tweet_text.loc[mask.any(axis=1)]

SyntaxError: invalid syntax (<ipython-input-126-a5ca3f61012e>, line 3)

In [136]:
mask = tweet_text.tweet_text.str.contains(r"\@realdonaldtrump", na=False)
tweet_text.tweet_text.loc[mask]

2014    RT @LiberalVenom: Jared Kushner thinks we have...
Name: tweet_text, dtype: object

In [111]:
import re
re.sub(r'@[A-Za-z0-9]+','',tweet_text.tweet_text[15])

'RT : Heartbreaking story about Ai Fen, a Chinese doc who raised alarm about #COVID19 in early Dec &amp; is now missing.\n\n…'

In [138]:
pat1 = r'@[A-Za-z0-9]+'
pat2 = r'https?://[A-Za-z0-9./]+'
combined_pat = r'|'.join((pat1, pat2))
combined_pat

'@[A-Za-z0-9]+|https?://[A-Za-z0-9./]+'

In [162]:
tweet_lst = tweet_text.tweet_text.tolist()
tweet_lst[2]

"RT @JustinBrannan: Dear @Amazon,\n\nWith people forced to stay safe inside from #COVID19, \nyou are making more $$ than ever before. \n\nNow's n…"

In [169]:
''.join([c for c in tweet_lst[2] if c not in punctuation])

'RT JustinBrannan Dear Amazon\n\nWith people forced to stay safe inside from COVID19 \nyou are making more  than ever before \n\nNows n…'

In [166]:
tweet_strings = tweet_text['tweet_text'].str.cat(sep=' ')

In [168]:
len(tweet_strings)

358230

In [172]:
import lxml

In [193]:
from nltk.tokenize import WordPunctTokenizer
import lxml
tok = WordPunctTokenizer()
pat1 = r'@[A-Za-z0-9]+'
pat2 = r'https?://[A-Za-z0-9./]+'
combined_pat = r'|'.join((pat1, pat2))

def tweet_cleaner(text):
#     soup = BeautifulSoup(text, 'lxml')
#     souped = soup.get_text()
    stripped = re.sub(combined_pat, '', text)
    try:
        clean = stripped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        clean = stripped
    letters_only = re.sub("[^a-zA-Z]", " ", clean)
    lower_case = letters_only.lower()
    # During the letters_only process two lines above, it has created unnecessay white spaces,
    # I will tokenize and join together to remove unneccessary white spaces
    words = tok.tokenize(lower_case)
    return (" ".join(words)).strip()

In [195]:
test = tweet_cleaner(tweet_lst[29])
print(tweet_lst[29])
print(test)
print(type(test))

RT @codepink: The people of #Iran are suffocating &amp; the people of #Venezuela are suffering as the US continues to impose brutal sanctions o…
rt the people of iran are suffocating amp the people of venezuela are suffering as the us continues to impose brutal sanctions o
<class 'str'>


In [158]:
from string import punctuation
from sklearn.feature_extraction import stop_words
stopwords = stop_words.ENGLISH_STOP_WORDS

def lowercase_text(text):
    return text.lower()

def remove_punctuation(text, punctuation=punctuation):
    return ''.join([c for c in text if c not in punctuation])

def remove_newline(text):
    return text.replace('\n', '')

def split_text_into_words(text):
    return text.split(' ')

def remove_stopwords(word_lst, stopwords_set):
    return [word for word in word_lst if word not in stopwords_set]

def replace_names(word_lst, name_set, replacement_val):
    word_lst_with_replacement = [] 
    for word in word_lst:
        if word in name_set:
            val = replacement_val
        else:
            val = word
        word_lst_with_replacement.append(val)
    return word_lst_with_replacement

def create_cleaned_textline_from_words(words):
    text = ' '.join([word for word in words])
    return text

def line_cleaning_pipeline(text, stopwords_set, name_set, replace_val):
    text_lc = lowercase_text(text)
    text_np = remove_punctuation(text_lc)
    text_nnl = remove_newline(text_np)
    words = split_text_into_words(text_nnl)
    words_nsw = remove_stopwords(words, stopwords_set)
    words_cleaned = replace_names(words_nsw, name_set, replace_val) 
    line_of_text_cleaned = create_cleaned_textline_from_words(words_cleaned)
    return line_of_text_cleaned

