# Assignment 2

#### First Name:  Montserrat
#### Last Name: Comas

*Note that the file path is modified since it is stored in DBFS (spark notebook docker image not working for my MacBook Chip M1)*

## 1. Load Data from JSON

In [0]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Twitter Analysis")\
.getOrCreate()

In [0]:
df_twitter = spark.read.json("/FileStore/tables/corona_tweet_new.json")

In [0]:
df_twitter.printSchema()

In [0]:
### From the user nestec col select the following cols only id_str,followers_count,friends_count and created at 
# (2 points)
from pyspark.sql.functions import col 
df_twitter=df_twitter.select(df_twitter.created_at,
                             df_twitter.favorite_count,
                             df_twitter.hashtags,
                             df_twitter.id,
                             df_twitter.in_reply_to_status_id,
                             df_twitter.in_reply_to_user_id_str,
                             df_twitter.location,
                             df_twitter.reply_count,
                             df_twitter.retweet_count,
                             df_twitter.source,
                             df_twitter.user.id_str.alias('user_id_str'),
                             df_twitter.user.followers_count.alias('user_followers_count'),
                             df_twitter.user.friends_count.alias('user_friends_count'),
                             df_twitter.user.created_at.alias('user_created_at'))

In [0]:
df_twitter.printSchema()

In [0]:
# Print the total count of number of records in df_twitter(1 point)
df_twitter.count()

In [0]:
# Extract the source lable from source col by droping the anchor tab and save it as another col named extracted_source
# for example <a href="https://mobile.twitter.com" rel="nofollow">Twitter Web App</a> => Twitter Web App
# you can use "<a [^>]+>([^<]+)" as regualr expresion and the group would be 1 for this regular expression.
#(4 points)
from pyspark.sql.functions import regexp_extract, col

df_twitter=df_twitter.withColumn('extracted_source', regexp_extract(col('source'), "<a [^>]+>([^<]+)", 1))
df_twitter.select(col('extracted_source'),col('source')).show()

In [0]:
# Convert the DataFrame into RDD
rdd_twitter=df_twitter.rdd.map(tuple)

In [0]:
# Create a temporay table in memory with name as twitter (1 point)
df_twitter.createOrReplaceTempView("twitter")

## 2. Analyze Data

#### You will be writing code to find the answer to the questions listed below using Just RDD, Using spark SQL 

- Analyze using RDD 
- Analyze using Dataframe without temp table 
- Analyze using spark.sql with temp table

In [0]:
columns_dic = dict()
for i, name in enumerate(df_twitter.columns):
  columns_dic[name] = i
columns_dic

In [0]:
columns_dic['user_id_str']

#### 2.1 Get total number of unique users (1 point for each type)

In [0]:
# Using RDD
rdd_user_count = rdd_twitter.map(lambda x: x[columns_dic['user_id_str']]).distinct()
rdd_user_count.count()

In [0]:
# Using DataFramez
df_user_count = df_twitter.select('user_id_str').distinct()
df_user_count.count()

In [0]:
# Using spark.sql and the temporay table.
spark.sql('SELECT COUNT(DISTINCT(user_id_str)) FROM twitter').show()

#### 2.2 Get count of user who have more than 1 tweet in the data (2 points)

In [0]:
import pyspark.sql.functions as F

In [0]:
# Using RDD
rdd_user_tweet_count = rdd_twitter.map(lambda x: x[columns_dic['user_id_str']]).countByValue()
rdd_user_count.filter(lambda x: rdd_user_tweet_count[x]>1).count()

In [0]:
# Using DataFrame
df_user_tweet_count = df_twitter.groupby('user_id_str').agg(F.count('id').alias('num_tweets'))
df_user_tweet_count.filter(F.col('num_tweets')>1).count()

In [0]:
# Using spark.sql and the temporay table.
spark.sql("SELECT count(num_tweets) \
          FROM (\
            SELECT count(id) as num_tweets \
            FROM twitter \
            group by user_id_str\
          ) where num_tweets>1 ").show()

#### 2.3 Get total number unique extracted_source (1 point each)

In [0]:
# Using RDD
rdd_source_distinct=rdd_twitter.map(lambda x: x[columns_dic['extracted_source']]).distinct()
rdd_source_distinct.count()

In [0]:
# Using DataFrame
df_source_distinct = df_twitter.select('extracted_source').distinct()
df_source_distinct.count()

In [0]:
# Using spark.sql and the temporay table.
spark.sql("SELECT \
           COUNT(DISTINCT(extracted_source)) \
           FROM twitter").show()

#### 2.4 Get top 5 most used extracted_source

In [0]:
# Using RDD (5 points)
rdd_source_count=rdd_twitter.map(lambda x: x[columns_dic['extracted_source']]).countByValue()
top_5_source=rdd_source_distinct.takeOrdered(5, key=lambda x: -rdd_source_count[x])
top_5_source, [rdd_source_count[top_source] for top_source in top_5_source]

In [0]:
# Using DataFrame (2 points)
df_source_count=df_twitter.groupBy("extracted_source").agg(F.count('extracted_source').alias('count_source'))
display(df_source_count.sort(F.col('count_source').desc()).head(5))

extracted_source,count_source
Twitter for Android,6262
Twitter for iPhone,5698
Twitter Web App,2878
Twitter for iPad,428
Twitter Web Client,136


In [0]:
# Using spark.sql and the temporay table. (2 points)
spark.sql("SELECT \
          extracted_source, \
          COUNT(extracted_source) as count_source \
          FROM twitter \
          GROUP BY extracted_source \
          ORDER BY count_source DESC").show(5)

#### 2.5 Get count of distinct hastags used ( 5 point each)

In [0]:
# Using RDD
rdd_hashtags = rdd_twitter.flatMap(lambda x: x[columns_dic['hashtags']])
rdd_hashtags.distinct().count()

In [0]:
# Using DataFrame
df_twitter.select(F.explode(F.col('hashtags'))).distinct().count()

In [0]:
# Using spark.sql and the temporay table.
spark.sql("SELECT \
          count(aux_col) \
          FROM ( \
            SELECT \
            DISTINCT(EXPLODE(hashtags)) as aux_col \
            FROM twitter \
          ) aux").show()

#### 2.6 Get top 5 hashtags

In [0]:
# Using RDD (4 points)
hashtags_dic = rdd_hashtags.countByValue()
keys_rdd = rdd_hashtags.distinct()
rdd_top_5_hashtags = keys_rdd.map(lambda x: (x, hashtags_dic[x])).takeOrdered(5, lambda w: -w[1])
rdd_top_5_hashtags

In [0]:
# Using DataFrame (2 points)
hashtags_df = df_twitter.select(F.explode('hashtags').alias('expl_hasthags'))
df_count_hashtags = hashtags_df.groupBy("expl_hasthags").agg(F.count('expl_hasthags').alias('hashtag_count'))

display(df_count_hashtags.sort(F.col('hashtag_count').desc()).head(5))

expl_hasthags,hashtag_count
طبق_القدرات_للثانويه_ياريس,385
Corona,319
OilPrice,251
COVID19,125
corona,123


In [0]:
columns_dic

In [0]:
# Using spark.sql and the temporay table. (2 points)
spark.sql("SELECT \
          expl_hashtags, \
          COUNT(expl_hashtags) as count_hasthag  \
          FROM (\
            SELECT EXPLODE(hashtags) as expl_hashtags \
            FROM twitter \
          ) \
          GROUP BY expl_hashtags \
          ORDER BY count_hasthag DESC").show(5)

#### 2.7 Get total number of tweets which are retweeted more than 100 times

In [0]:
# Using RDD
rdd_retweet = rdd_twitter.filter(lambda x: x[columns_dic['retweet_count']]>100)
rdd_retweet.count()

In [0]:
# Using DataFrame
df_retweet = df_twitter.filter(F.col("retweet_count")>100)
df_retweet.count()

In [0]:
# Using spark.sql and the temporay table.
spark.sql("SELECT \
          count(id) \
          FROM twitter \
          WHERE retweet_count>100 ").show()


#### 2.8 Get top 3 most retweeted tweets per country (8 points)

In [0]:
from pyspark.sql.window import Window

In [0]:
columns_dic

In [0]:
# Using RDD
import numpy as np
rdd_retweet_country = rdd_twitter.map(lambda x: (x[columns_dic['location']],[x[columns_dic['id']],x[columns_dic['retweet_count']]])).groupByKey().mapValues(lambda x: np.sort(list(x),axis=0)[-3:])

rdd_retweet_country.collect()

In [0]:
# Using DataFrame
df_retweet_country = df_twitter\
  .select('id', 'location', 'retweet_count')\
  .withColumn('rt_loc_rank', F.dense_rank().over(Window.partitionBy('location').orderBy(F.desc('retweet_count'))))

display(df_retweet_country.filter(F.col('rt_loc_rank')<=3))

id,location,retweet_count,rt_loc_rank
1252335430323888128,Canada,9997,1
1252254877939531776,Canada,9992,2
1252252082825986051,Canada,9987,3
1252253612140490759,Chile,9988,1
1252334891951427585,Chile,9984,2
1252253710182481920,Chile,9978,3
1252335780707684352,China,9998,1
1252253596516843520,China,9993,2
1252255562525560832,China,9984,3
1252334028092399622,Germany,9999,1


In [0]:
# Using spark.sql and the temporay table.
spark.sql("SELECT * \
          FROM (\
            SELECT \
            location,\
            id,\
            retweet_count,\
            ROW_NUMBER() \
            OVER (PARTITION BY location ORDER BY retweet_count DESC) as rt_loc_rank \
            FROM twitter\
          ) aux_table \
          WHERE rt_loc_rank <= 3").show()

#### 2.9 Total number of tweets per country

In [0]:
# Using RDD (3 points)
rdd_loc = rdd_twitter.map(lambda x: x[columns_dic['location']])
rdd_loc.countByValue()

In [0]:
# Using DataFrame (2 points)
df_loc = df_twitter.groupBy('location').agg(F.count('location').alias('tweets_per_country'))
df_loc.show()

In [0]:
# Using spark.sql and the temporay table. (1 point)
spark.sql("SELECT \
          location,\
          COUNT(location) as tweets_per_country \
          FROM twitter \
          GROUP BY location").show()

## 3. Save Data

#### 3.1 save the data such that you have seperate folder per country (2 points)

In [0]:
# Using DataFrame
df_twitter.write.partitionBy("location").format("json").save("corona_tweet_by_loc.json")

#### 3.2 Save the data as parquet files (1 points)

In [0]:
# Using DataFrame
# partitions by location: df_twitter.write.partitionBy("location").format("parquet").save("corona_tweet_by_loc.parquet")

# default partitions:
df_twitter.write.format("parquet").save("corona_tweet_out.parquet")