In [1]:
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
import json
import pandas as pd
import os
from sqlalchemy import create_engine
from collections import defaultdict

In [2]:
database = "twitter_spark"
user = "root"
password  = "Qwertyw@123"
path = "/home/nineleaps/PycharmProjects/spark_twitter/data/"

# path = "/home/nineleaps/Desktop/Day-22-24-20200610T055939Z-001/Day-22-24/data/"

In [3]:
engine = create_engine("mysql+pymysql://{user}:{pw}@localhost/{db}"
                       .format(user="root",
                               pw="Qwerty@123",
                               db="twitter_spark"))
print(engine)

Engine(mysql+pymysql://root:***@localhost/twitter_spark)


In [4]:
df = spark\
.readStream.format("socket")\
.option("host","localhost")\
.option("port","9001")\
.load()

In [5]:
df\
.writeStream\
.format("json")\
.option("path", path)\
.option("checkpointLocation", path)\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f5e65604da0>

In [6]:
# Getting JSON File Name that Contains Data.

file_path=[]
main_file = ''

for file in os.listdir(path):
    if file.endswith(".json"):
        file_path.append(path+file)

for file in file_path:
    if os.stat(file).st_size != 0:
        main_file = file
        
print(main_file)

/home/nineleaps/Desktop/Day-22-24-20200610T055939Z-001/Day-22-24/data/part-00000-c67d1a3b-45a9-4928-b966-970d63718f2f-c000.json


In [7]:
# Reading Data From JSON File and Storing Into Dataframe.

f = open(main_file, "r")
file_content = json.loads(f.read())['value']

s = list(file_content.split('}'))
l = s
arr=[]
s[:] = [i + '}' for i in s]
for i in l[:-1]:
    arr.append(json.loads(i))

for t in arr:
    temp = t['hashtag'].split(',')
    t['hashtag'] = ','.join(list(set(temp)))

df = pd.DataFrame(arr)

In [8]:
# Populating Data Into SQL Table.

df.to_sql('feed_data', con = engine, if_exists = 'replace', index=False)

In [9]:
# Reading Data From SQL Table.

result = engine.execute('SELECT* FROM feed_data')
for i in result.fetchmany(3):
    print(i)

('Ghana', '#Hushpuppi', 'Wed Jun 10 09:51:31 +0000 2020')
('United States', '#GoneWithTheWind', 'Wed Jun 10 09:51:32 +0000 2020')
('India', '#1YearOfFainatsTBK', 'Wed Jun 10 09:51:34 +0000 2020')


In [10]:
# Creating Spark Dataframe Object.

myJson = sc.parallelize(arr)
myDf = sqlContext.read.json(myJson)
myDf.show(10)

+-------------+--------------------+--------------------+
|      country|             hashtag|                time|
+-------------+--------------------+--------------------+
|        Ghana|          #Hushpuppi|Wed Jun 10 09:51:...|
|United States|    #GoneWithTheWind|Wed Jun 10 09:51:...|
|        India|  #1YearOfFainatsTBK|Wed Jun 10 09:51:...|
|United States|          #Pakistani|Wed Jun 10 09:51:...|
|United States|  #earthquake,#quake|Wed Jun 10 09:51:...|
|United States|  #earthquake,#quake|Wed Jun 10 09:51:...|
|        Kenya|  #CoronaHasTaughtMe|Wed Jun 10 09:51:...|
|        India|#CoronaFreeBharat...|Wed Jun 10 09:51:...|
|        Ghana|       #DaybreakHitz|Wed Jun 10 09:51:...|
|        India|#JhootiHaiBiharSa...|Wed Jun 10 09:51:...|
+-------------+--------------------+--------------------+
only showing top 10 rows



# Solution 1.

In [11]:
# Getting Location wise most popular hashtags.

my_li = []
unique_country_list = myDf.select('country').distinct().collect()

for country in unique_country_list:
    a = myDf.filter(myDf.country == country[0]).select(myDf.hashtag).collect()
    li=[]
    for i in a:
        li.extend([j.lstrip() for j in i[0].split(',')])
    
    d = defaultdict(int)
    for i in li:
        d[i] += 1
    result = max(d.items(), key=lambda x: x[1])

        
    my_li.append({
        'country': country[0],
        'most_popular_hashtag': result[0]
    })

# Creating Spark Dataframe Object.
myJson1 = sc.parallelize(my_li)
df1 = sqlContext.read.json(myJson1)
df1.show(10)

+--------------------+--------------------+
|             country|most_popular_hashtag|
+--------------------+--------------------+
|Hashemite Kingdom...|         #NiajeNiaje|
|             Türkiye|               #MOOD|
|               Ghana|       #DaybreakHitz|
|       United States|         #earthquake|
|               India|  #1YearOfFainatsTBK|
|             Nigeria|#LateMorningMusic...|
|             Grenada|          #Ghosttown|
|              Uganda|        #VisitUganda|
|               Kenya|  #CoronaHasTaughtMe|
+--------------------+--------------------+



In [12]:
# Populating Data Into SQL Table.

df1_sql = pd.DataFrame(my_li)
df1_sql.to_sql('most_popular_hashtag', con = engine, if_exists = 'replace', index=False)

In [13]:
# Reading Data From SQL Table.

result = engine.execute('SELECT* FROM most_popular_hashtag')
for i in result.fetchmany(3):
    print(i)

('Hashemite Kingdom of Jordan', '#NiajeNiaje')
('Türkiye', '#MOOD')
('Ghana', '#DaybreakHitz')


# Solution 2.

In [14]:
# Getting most popular hashtags used in combination with.

comb_list = []
for i in my_li:
    a = myDf.filter(myDf["hashtag"].contains(i['most_popular_hashtag'])).select(myDf['hashtag']).collect()
    
    hashtags_list=[]
    for k in a:
        hashtags_list.extend([j.lstrip() for j in k[0].split(',')])
        
    li = [j for j in hashtags_list if j != i['most_popular_hashtag']]
    comb_hashtag = ','.join(list(set(li)))

    if comb_hashtag != '':        
        comb_list.append({
        'country': i['country'],
        'most_popular_hashtag': i['most_popular_hashtag'],
        'combination_hashtags': comb_hashtag})
        
# Creating Spark Dataframe Object.
myJson2 = sc.parallelize(comb_list)
df2 = sqlContext.read.json(myJson2)
df2.show(10)

+--------------------+-------------+--------------------+
|combination_hashtags|      country|most_popular_hashtag|
+--------------------+-------------+--------------------+
|              #quake|United States|         #earthquake|
|       #summerwalker|      Nigeria|#LateMorningMusic...|
|       #LakeBunyonyi|       Uganda|        #VisitUganda|
+--------------------+-------------+--------------------+



In [15]:
# Populating Data Into SQL Table.

df2_sql = pd.DataFrame(comb_list)
df2_sql.to_sql('most_popular_hashtag_combination', con = engine, if_exists = 'replace', index=False)

In [16]:
# Reading Data From SQL Table.

result = engine.execute('SELECT* FROM most_popular_hashtag_combination')
for i in result.fetchmany(3):
    print(i)

('United States', '#earthquake', '#quake')
('Nigeria', '#LateMorningMusicMixLive', '#summerwalker')
('Uganda', '#VisitUganda', '#LakeBunyonyi')


# Solution 3.

In [17]:
# Getting Tweet Frequency.

list3 = []

for i in my_li:
    a = myDf.filter(myDf["hashtag"].contains(i['most_popular_hashtag'])).select([myDf['hashtag'], myDf['time']]).collect()
    for j in a:
          list3.append({
              'country': i['country'],
              'hashtag': i['most_popular_hashtag'],
              'time': j[1]
          })
            
# Creating Spark Dataframe Object.
myJson3 = sc.parallelize(list3)
df3 = sqlContext.read.json(myJson3)
df3.groupby("country","hashtag").count().show(10)

+--------------------+--------------------+-----+
|             country|             hashtag|count|
+--------------------+--------------------+-----+
|               Ghana|       #DaybreakHitz|    2|
|               Kenya|  #CoronaHasTaughtMe|    1|
|              Uganda|        #VisitUganda|    1|
|       United States|         #earthquake|    2|
|             Grenada|          #Ghosttown|    1|
|             Türkiye|               #MOOD|    1|
|             Nigeria|#LateMorningMusic...|    1|
|Hashemite Kingdom...|         #NiajeNiaje|    1|
|               India|  #1YearOfFainatsTBK|    2|
+--------------------+--------------------+-----+



In [18]:
# Populating Data Into SQL Table.

df3_sql = pd.DataFrame(list3)
df3_sql.to_sql('tweet_frequency', con = engine, if_exists = 'replace', index=False)

In [19]:
# Reading Data From SQL Table.

result = engine.execute('SELECT* FROM tweet_frequency')
for i in result.fetchmany(3):
    print(i)

('Hashemite Kingdom of Jordan', '#NiajeNiaje', 'Wed Jun 10 09:52:15 +0000 2020')
('Türkiye', '#MOOD', 'Wed Jun 10 09:51:55 +0000 2020')
('Ghana', '#DaybreakHitz', 'Wed Jun 10 09:51:46 +0000 2020')
