# Data Extraction

This notebook reads through the numerous JSON files and extracts only the relevant tweets based on string matching.

The substrings searched for are actually hashtags, and by nature of which, will be found as exact matches within the tweet text body.

These hashtags would also exist in the _entities_ json element in an array, which will be used later for exploration.

In [1]:
# Put these at the top of every notebook, to get automatic reloading and inline plotting
from IPython.core.display import display, HTML
import pandas as pd
%reload_ext autoreload
%autoreload 1
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('max_colwidth',100)    

display(HTML("<style>.container { width:95% !important; }</style>"))

In [1]:
import sys
import matplotlib.pyplot as plt
import seaborn as sns
import re
import json
from itertools import combinations, takewhile
import collections

from simhash import Simhash, SimhashIndex

import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
# from pyspark.ml.feature import OneHotEncoderEstimator
# OneHotEncoderEstimator is available starting from Spark 2.3
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
!hdfs dfs -ls -h '/user/ivy2/Tweets/' > '/home/sriharis/git_projects/BigDataEngg/final_project/file_list.txt'
tweets_path = '/user/ivy2/Tweets/'



In [30]:
# json_file = "hdfs:///user/ivy2/Tweets/tweets*.json"
json_file = "hdfs:///user/ivy2/Tweets/tweets2017*.json"
df=spark.read.json(json_file)

In [31]:
fields_to_keep = ["id_str", 
                  "text",
                  "in_reply_to_status_id_str",
                  "in_reply_to_user_id_str", 
                  "created_at",
                  # User columns
                  "user.id_str",
                  "user.name",
                  "user.followers_count",
                  "user.favourites_count",
                  "user.statuses_count",
                  "user.friends_count",
                  # Other attributes
                  "coordinates",
                  "favorite_count",
                  "entities.hashtags",
                  "favorited", 
                  "place.country",
                  "place.country_code",
                  "place.name",
                  "place.place_type",
                  # Retweet columns
                  "retweet_count", 
                  "retweeted",
                  "retweeted_status.user.id_str",
                  "retweeted_status.user.name"
                 ]

df = df.selectExpr("id_str", 
              "text",
              "in_reply_to_status_id_str",
              "in_reply_to_user_id_str", 
              "created_at",
              "user.id_str as user_id_str",
              "user.name as user_name",
              "user.followers_count as user_followers_count",
              "user.favourites_count as user_favorites_count",
              "user.statuses_count as user_statuses_count",
              "user.friends_count as user_friends_count",
              "coordinates",
              "favorite_count",
              "entities.hashtags as entities_hashtags",
              "favorited", 
              "place.country as place_country",
              "place.country_code as place_country_code",
              "place.name as place_name",
              "place.place_type as place_type",
              "retweet_count", 
              "retweeted",
              "retweeted_status.user.id_str as retweeted_status_user_id_str",
              "retweeted_status.user.name as retweeted_status_user_name")
df.show(2)

+------------------+--------------------+-------------------------+-----------------------+--------------------+-----------+---------+--------------------+--------------------+-------------------+------------------+-----------+--------------+-----------------+---------+-------------+------------------+----------+----------+-------------+---------+----------------------------+--------------------------+
|            id_str|                text|in_reply_to_status_id_str|in_reply_to_user_id_str|          created_at|user_id_str|user_name|user_followers_count|user_favorites_count|user_statuses_count|user_friends_count|coordinates|favorite_count|entities_hashtags|favorited|place_country|place_country_code|place_name|place_type|retweet_count|retweeted|retweeted_status_user_id_str|retweeted_status_user_name|
+------------------+--------------------+-------------------------+-----------------------+--------------------+-----------+---------+--------------------+--------------------+------------

In [32]:
df.count()

1489935

In [34]:
# uc_favored_tags = ["uchicago", "uchearing", "universityofchicago", "pritzkerschoolofmedicine",
#                    "uofc", "maroonmade", "chicagobooth"]
# nw_favored_tags = ["northwestern"]
# upenn_favored_tags = ["upenn", "penn", "uofpenn", "universityofpennsylvania", "pennlaw", "upennhearing"]
# uic_favored_tags = ["UIC", "UICProud"]
# mit_favored_tags = ["MIT"]
# stanford_favored_tags = ["stanford"]

df_university  = df.where('lower(text) like "%uchicago%"\
                            or lower(text) like "%uchearing%"\
                            or lower(text) like "%universityofchicago%"\
                            or lower(text) like "%pritzkerschoolofmedicine%"\
                            or lower(text) like "%uofc%"\
                            or lower(text) like "%chicagobooth%"\
                            or lower(text) like "%maroonmade%"\
                            or lower(text) like "%northwestern%"\
                            or lower(text) like "%upenn%"\
                            or lower(text) like "%upennhearing%"\
                            or lower(text) like "%penn%"\
                            or lower(text) like "%uofpenn%"\
                            or lower(text) like "%universityofpennsylvania%"\
                            or lower(text) like "%pennlaw%"\
                            or lower(text) like "%uicproud%"\
                            or lower(text) like "%uic %"\
                            or lower(text) like "%stanford%"'
                         )#or lower(text) like "%mit %"\
df_university.cache()
df_university.count()

31831

In [35]:
df_university.select("entities_hashtags").limit(50).show()

+--------------------+
|   entities_hashtags|
+--------------------+
|[[[7, 19], abeced...|
|                  []|
|[[[36, 44], Argon...|
|  [[[112, 115], Wx]]|
|[[[67, 76], Break...|
|[[[108, 114], dad...|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|   [[[91, 95], CBC]]|
|[[[43, 57], Yuvas...|
|                  []|
|                  []|
|[[[103, 111], B1G...|
|[[[91, 99], CBCNe...|
|                  []|
|                  []|
+--------------------+
only showing top 20 rows



In [36]:
!hdfs dfs -rm -r "hdfs:///user/sriharis/project"

2019-03-22 02:10:21,528 INFO  [main] fs.TrashPolicyDefault (TrashPolicyDefault.java:moveToTrash(182)) - Moved: 'hdfs://nameservice1/user/sriharis/project' to trash at: hdfs://nameservice1/user/sriharis/.Trash/Current/user/sriharis/project1553238621501


In [37]:
hdfsdir = "hdfs:///user/sriharis/project"
df_university.coalesce(1).write.format("json").save(hdfsdir)

In [38]:
!hadoop fs -ls -h /user/sriharis/project

Found 2 items
-rw-r--r--   3 sriharis sriharis          0 2019-03-22 02:10 /user/sriharis/project/_SUCCESS
-rw-r--r--   3 sriharis sriharis     16.8 M 2019-03-22 02:10 /user/sriharis/project/part-00000-ad6a81ae-9524-4fa5-97e1-9babda4718a3-c000.json


-----------------------------