In [138]:
from pyspark.sql import functions as F

In [139]:
dataset = spark.read.json("/root/twitter.medium")

A function to parse the date:

In [140]:
from datetime import datetime
from operator import add

def parse_date(date_field):
    date_parts = date_field.split()
    date_str = "%s/%s/%s:%s" % (date_parts[2],
                                date_parts[1],
                                date_parts[5],
                                date_parts[3])
    return datetime.strptime(date_str, '%d/%b/%Y:%H:%M:%S')

Register temp table for SQL

In [141]:
dataset.registerTempTable("tweets")

# Create keyspace

In [142]:
#create KEYSPACE assignment2 WITH replication =  
#{'class': 'SimpleStrategy', 'replication_factor': 1};

# Tweets-oriented

Tweets per day, month, year

In [143]:
#Cassandra code:
#create table tweets_per_day ( 
#tday varchar primary key, tcount double);

In [144]:
tweets_per_day = dataset.rdd\
.map(lambda x: (parse_date(x.created_at)\
                .strftime("%Y-%m-%d"), 1))\
.reduceByKey(add)

In [145]:
tweets_per_day.take(5)

[('2013-03-07', 452),
 ('2013-03-10', 741),
 ('2013-03-05', 419),
 ('2013-03-03', 819),
 ('2013-03-16', 585)]

In [146]:
tweets_per_day = tweets_per_day.toDF()

#converting rdd to dataFrame

In [147]:
tweets_per_day = tweets_per_day.toDF('tday', 'tcount')
tweets_per_day.select("tday", "tcount")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="tweets_per_day", keyspace="assignment2")\
.save(mode="overwrite")

#renaming the columns of the dataFrame and adding the data of 
#dataFrame to Cassandra table

In [148]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="tweets_per_day")\
.show(5, False)

#verification: data written in cassandra table or not

+----------+------+
|tday      |tcount|
+----------+------+
|2013-03-19|348.0 |
|2013-03-12|354.0 |
|2013-03-03|819.0 |
|2013-03-06|383.0 |
|2013-03-18|476.0 |
+----------+------+
only showing top 5 rows



Interactions per day, month, year

In [149]:
#Cassandra code:
#create table interactions_per_day ( 
#interday varchar primary key, intercount double);

In [186]:
interactions_per_day = dataset.rdd\
.map(lambda x: (parse_date(x.created_at)\
                .strftime("%Y-%m-%d"), \
                x.favorite_count + x.retweet_count))\
.reduceByKey(add)

In [187]:
interactions_per_day.take(5)

[('2013-03-07', 14),
 ('2013-03-10', 35),
 ('2013-03-05', 17),
 ('2013-03-03', 25),
 ('2013-03-16', 31)]

In [188]:
interactions_per_day = interactions_per_day.toDF()

#converting rdd to dataFrame

In [189]:
interactions_per_day = interactions_per_day\
.toDF('interday', 'intercount')

interactions_per_day.select("interday", "intercount")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="interactions_per_day", keyspace="assignment2")\
.save(mode="overwrite")

#renaming the columns of the dataFrame and adding the data of 
#dataFrame to Cassandra table

In [190]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="interactions_per_day")\
.show(5, False)

#verification: data written in cassandra table or not

+----------+----------+
|interday  |intercount|
+----------+----------+
|2013-03-19|25.0      |
|2013-03-12|29.0      |
|2013-03-03|25.0      |
|2013-03-06|3.0       |
|2013-03-18|82.0      |
+----------+----------+
only showing top 5 rows



### Creating a single structure for 2 RDDs

In [198]:
tweets_per_day.registerTempTable("temp_tweets")
interactions_per_day.registerTempTable("temp_inter")
temp_join = spark.sql("select tday,tcount,intercount from temp_tweets "\
                     + "full outer join temp_inter on "\
                     + "temp_tweets.tday = temp_inter.interday")

In [199]:
temp_join.show(5, False)

+----------+------+----------+
|tday      |tcount|intercount|
+----------+------+----------+
|2013-03-14|335   |20        |
|2013-03-05|419   |17        |
|2013-03-07|452   |14        |
|2013-03-19|348   |25        |
|2013-03-09|634   |30        |
+----------+------+----------+
only showing top 5 rows



Loading this structure in Cassandra

In [200]:
temp_join.printSchema()

root
 |-- tday: string (nullable = true)
 |-- tcount: long (nullable = true)
 |-- intercount: long (nullable = true)



In [201]:
#create table temp_join (
#               ... tday varchar primary key,
#               ... tcount double,
#               ... intercount double);

In [202]:
temp_join.select("tday", "tcount", "intercount")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="temp_join", keyspace="assignment2")\
.save(mode="overwrite")

#writing data into cassandra table

In [203]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="temp_join")\
.show(5, False)

#verifying that data is present in the table

+----------+----------+------+
|tday      |intercount|tcount|
+----------+----------+------+
|2013-03-19|25.0      |348.0 |
|2013-03-12|29.0      |354.0 |
|2013-03-03|25.0      |819.0 |
|2013-03-06|3.0       |383.0 |
|2013-03-18|82.0      |476.0 |
+----------+----------+------+
only showing top 5 rows



# Movie-oriented Analysis

Tweets per movie per day

In [130]:
# create table movies_tweets_per_day (
#               ... movie_title varchar primary key,
#               ... mdate varchar,
#               ... tweets double);

In [122]:
movies_tweets_per_day = dataset.rdd\
.map(lambda x: ((x.entities.urls[0].display_url, \
                 parse_date(x.user.created_at)\
                 .strftime("%Y-%m-%d")), 1))\
.reduceByKey(add)

In [123]:
movies_tweets_per_day = movies_tweets_per_day\
.map(lambda (k,v): (k[0],k[1],v))

#re-mapping the key value pair by breaking the key into two parts

In [124]:
movies_tweets_per_day = movies_tweets_per_day.toDF()

#converting into dataframe

In [126]:
movies_tweets_per_day = movies_tweets_per_day\
.toDF('movie_title', 'mdate', 'tweets')

#renaming data frame

In [127]:
movies_tweets_per_day.printSchema()

root
 |-- movie_title: string (nullable = true)
 |-- mdate: string (nullable = true)
 |-- tweets: long (nullable = true)



In [128]:
movies_tweets_per_day.select("movie_title", "mdate", "tweets")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="movies_tweets_per_day", keyspace="assignment2")\
.save(mode="overwrite")

#writing data into cassandra table

In [129]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="movies_tweets_per_day")\
.show(5, False)

#verifying that data is present in the table

+------------------------+----------+------+
|movie_title             |mdate     |tweets|
+------------------------+----------+------+
|imdb.com/title/tt0408777|2011-01-17|1.0   |
|imdb.com/title/tt0480025|2009-02-10|1.0   |
|imdb.com/title/tt1014759|2010-09-10|1.0   |
|imdb.com/title/tt2088735|2010-05-10|1.0   |
|imdb.com/title/tt1027820|2009-05-14|1.0   |
+------------------------+----------+------+
only showing top 5 rows



Engagement per movie

In [156]:
movies_engagement = spark.sql("select entities.urls[0].display_url as movie, "+\
               "sum(favorite_count) + sum(retweet_count) as engagement "
               "from tweets "+\
               "group by entities.urls[0].display_url")

#using temp table to get engagements per movie

In [157]:
movies_engagement.show(5, False)

+------------------------+----------+
|movie                   |engagement|
+------------------------+----------+
|imdb.com/title/tt0319343|0         |
|imdb.com/title/tt1542344|0         |
|imdb.com/title/tt0467197|0         |
|imdb.com/title/tt0061722|0         |
|imdb.com/title/tt2075373|0         |
+------------------------+----------+
only showing top 5 rows



In [158]:
movies_engagement.printSchema()

root
 |-- movie: string (nullable = true)
 |-- engagement: long (nullable = true)



In [159]:
#Code to create movies_engagement table in Cassandra:
#create table movies_engagement (
#               ... movie varchar primary key,
#               ... engagement double);

In [160]:
movies_engagement.select("movie", "engagement")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="movies_engagement", keyspace="assignment2")\
.save(mode="overwrite")

#adding the data of dataFrame to Cassandra table

In [161]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="movies_engagement")\
.show(5, False)

#verification that data has been added into Cassandra table

+------------------------+----------+
|movie                   |engagement|
+------------------------+----------+
|imdb.com/title/tt0408777|0.0       |
|imdb.com/title/tt0480025|1.0       |
|imdb.com/title/tt1014759|0.0       |
|imdb.com/title/tt2088735|0.0       |
|imdb.com/title/tt1027820|0.0       |
+------------------------+----------+
only showing top 5 rows



Popular movies per language

In [162]:
movies_language_pop = dataset.select(F.explode("entities.urls").alias("col"), F.col("lang").alias("language"))\
    .select(F.col("col.display_url").alias("movie"), "language")\
    .groupBy("movie", "language")\
    .count()
    
#creating dataframe to calculate  popular movies per language

In [163]:
movies_language_pop.show(5, False)

+------------------------+--------+-----+
|movie                   |language|count|
+------------------------+--------+-----+
|imdb.com/title/tt1486192|en      |10   |
|imdb.com/title/tt1232200|en      |5    |
|imdb.com/title/tt0082398|en      |1    |
|imdb.com/title/tt0082508|en      |1    |
|imdb.com/title/tt0441773|sk      |1    |
+------------------------+--------+-----+
only showing top 5 rows



In [164]:
movies_language_pop.printSchema()

root
 |-- movie: string (nullable = true)
 |-- language: string (nullable = true)
 |-- count: long (nullable = false)



In [165]:
#Code to create movies_language_pop table in Cassandra:
#create table movies_language_pop (
#              ... movie varchar primary key,
#              ... language varchar,
#              ... count double);

In [166]:
movies_language_pop.select("movie", "language", "count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="movies_language_pop", keyspace="assignment2")\
.save(mode="overwrite")

#adding the data of dataFrame to Cassandra table

In [167]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="movies_language_pop")\
.show(5, False)

#verification that data has been added into Cassandra table

+------------------------+-----+--------+
|movie                   |count|language|
+------------------------+-----+--------+
|imdb.com/title/tt0408777|1.0  |en      |
|imdb.com/title/tt0480025|3.0  |en      |
|imdb.com/title/tt1014759|2.0  |en      |
|imdb.com/title/tt2088735|3.0  |en      |
|imdb.com/title/tt1027820|1.0  |en      |
+------------------------+-----+--------+
only showing top 5 rows



# Users-oriented

Number of followers, favourites, statuses and listings per user, oldest and newest data:

In [168]:
user_stats = spark.sql("select x.screen_name, " \
          + "t1.user.followers_count as old_followers, t1.user.favourites_count as old_favourites, t1.user.statuses_count as old_statuses, t1.user.listed_count as old_listed, " \
          + "t2.user.followers_count as new_followers, t2.user.favourites_count as new_favourites, t2.user.statuses_count as new_statuses, t2.user.listed_count as new_listed " \
          + "from (select user.screen_name, max(created_at) as new, min(created_at) as old from tweets group by user.screen_name) x " 
          + "join tweets t1 on t1.user.screen_name = x.screen_name and t1.created_at = x.old " \
          + "join tweets t2 on t2.user.screen_name = x.screen_name and t2.created_at = x.new")

#creating data frame for user statistics

In [169]:
user_stats.printSchema()

root
 |-- screen_name: string (nullable = true)
 |-- old_followers: long (nullable = true)
 |-- old_favourites: long (nullable = true)
 |-- old_statuses: long (nullable = true)
 |-- old_listed: long (nullable = true)
 |-- new_followers: long (nullable = true)
 |-- new_favourites: long (nullable = true)
 |-- new_statuses: long (nullable = true)
 |-- new_listed: long (nullable = true)



In [170]:
#Code to create user_stats table in Cassandra: 
#create table user_stats (
#... screen_name varchar primary key,
#... old_followers double,
#... old_favourites double,
#... old_statuses double,
#... old_listed double,
#... new_followers double,
#... new_favourites double,
#... new_statuses double,
#... new_listed double
#... );

In [171]:
user_stats.select("screen_name", "old_followers", "old_favourites"\
                  ,"old_statuses", "old_listed", "new_followers"\
                  ,"new_favourites", "new_statuses", "new_listed")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="user_stats", keyspace="assignment2")\
.save(mode="overwrite")

#adding the data of dataFrame to Cassandra table

In [172]:
spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="user_stats")\
.show(5, False)

#verification that data has been added into Cassandra table

+-------------+--------------+-------------+----------+------------+--------------+-------------+----------+------------+
|screen_name  |new_favourites|new_followers|new_listed|new_statuses|old_favourites|old_followers|old_listed|old_statuses|
+-------------+--------------+-------------+----------+------------+--------------+-------------+----------+------------+
|Chris_G_Elias|3.0           |135.0        |1.0       |2025.0      |3.0           |135.0        |1.0       |2024.0      |
|dartheseus   |6.0           |150.0        |8.0       |6125.0      |6.0           |150.0        |8.0       |6125.0      |
|edd_b        |808.0         |905.0        |11.0      |7583.0      |808.0         |905.0        |11.0      |7583.0      |
|MirkleyJo    |15.0          |109.0        |0.0       |7638.0      |15.0          |109.0        |0.0       |7638.0      |
|spongetwan   |313.0         |118.0        |2.0       |11226.0     |313.0         |118.0        |2.0       |11226.0     |
+-------------+---------

1. What are the top 20 most popular movies?

In [173]:
pop_movies = spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="movies_engagement")

#reading data from cassandra into a dataframe

In [174]:
pop_movies.registerTempTable("popular_movies")

#creating a temp table on dataframe

In [175]:
spark.sql("select movie, "\
          + "engagement from popular_movies "\
          + "order by engagement desc")\
.show(20,False)

#using sql to get answer

+------------------------+----------+
|movie                   |engagement|
+------------------------+----------+
|imdb.com/title/tt0434139|61.0      |
|imdb.com/title/tt1045658|19.0      |
|imdb.com/title/tt1707386|18.0      |
|imdb.com/title/tt1623205|18.0      |
|imdb.com/title/tt0454876|15.0      |
|imdb.com/title/tt1047011|13.0      |
|imdb.com/title/tt1024648|11.0      |
|imdb.com/title/tt0840361|9.0       |
|imdb.com/title/tt1659337|9.0       |
|imdb.com/title/tt1389096|9.0       |
|imdb.com/title/tt1966604|7.0       |
|imdb.com/title/tt0151804|6.0       |
|imdb.com/title/tt0110912|5.0       |
|imdb.com/title/tt1673434|5.0       |
|imdb.com/title/tt0838283|5.0       |
|imdb.com/title/tt1637725|5.0       |
|imdb.com/title/tt1853728|5.0       |
|imdb.com/title/tt1772341|5.0       |
|imdb.com/title/tt0405094|4.0       |
|imdb.com/title/tt1649419|4.0       |
+------------------------+----------+
only showing top 20 rows



3.What is the most popular movie in the group of Spanish-speaking users?

In [176]:
pop_movies_lang=spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="movies_language_pop")

#reading data from cassandra into a dataframe

In [177]:
pop_movies_lang.registerTempTable("popular_movies_es")

#creating a temp table on dataframe

In [178]:
spark.sql("select movie, "\
          + "count from popular_movies_es "\
          + "where language = 'es' "\
          + "order by count desc")\
.show(5, False)

#using sql to get answer

+------------------------+-----+
|movie                   |count|
+------------------------+-----+
|imdb.com/title/tt2023587|2.0  |
|imdb.com/title/tt1845846|2.0  |
|imdb.com/title/tt1781769|2.0  |
|imdb.com/title/tt1680133|2.0  |
|imdb.com/title/tt1392888|1.0  |
+------------------------+-----+
only showing top 5 rows



2.In which month we collected the most interactions?

In [204]:
interPerDay = spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="interactions_per_day")

#reading data from cassandra into a dataframe

In [205]:
interPerDay.registerTempTable("interactions_per_day")

#creating a temp table on dataframe

In [206]:
interPerMonth = interPerDay\
.select(interPerDay['interday'].substr(6,2).alias('tmonth'),
       interPerDay['intercount'])\
.groupby('tmonth')\
.agg({'intercount': 'sum'})

#using cassandra operations on data frame to obtain answer

In [207]:
interPerMonth.show(1)

+------+---------------+
|tmonth|sum(intercount)|
+------+---------------+
|    03|          472.0|
+------+---------------+
only showing top 1 row



4.What are the users with the most changes in numbers of followers between first and the last tweet?

In [183]:
uStats = spark.read.format("org.apache.spark.sql.cassandra")\
.load(keyspace="assignment2", table="user_stats")

#reading data from cassandra into a dataframe

In [184]:
uStats.registerTempTable("user_stats")

#creating a temp table on dataframe

In [185]:
spark.sql("select screen_name, "\
          + "new_followers-old_followers as changes "\
          + "from user_stats "\
          + "order by changes desc")\
.show(5, False)

#using sql to get answer

+---------------+-------+
|screen_name    |changes|
+---------------+-------+
|DevilsBallBag  |16.0   |
|TheArsenal77   |9.0    |
|carlosshue     |7.0    |
|MaxLikesNOODLES|5.0    |
|stephenjcleary |5.0    |
+---------------+-------+
only showing top 5 rows

