In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
findspark.find()

spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import *

In [2]:
spark

In [3]:
# import packages
import os 
import pickle

import re
from datetime import datetime

import requests

import pandas as pd
import numpy

import pyspark.sql.functions as F
from pyspark.sql.types import *

from random import randint
from time import sleep



# 2. Twitter Data

## 2.1. Import json data

<font size="3">Next, we will import the twitter data, which is also stored in json files. 
This data will be used in order to improve the predictive accuracy of the model.

In [4]:
# define data dir
data_dir = "../Kickstarter_Big_Data_Assignment"
# get all twitter files
tweet_files = [os.path.join(data_dir, obs) for obs in os.listdir(data_dir)]

In [5]:
# check
tweet_files

['./Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200306_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200309.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200312.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200312_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200316.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327_part_2.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327_part_3.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200405.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200405_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200421.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200421_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200429.json',
 './Kickstart

In [6]:
# get tweet dates 
tweet_dates = [re.findall("(.*)(\d{8})(.*)", obs)[0][1] for obs in tweet_files]
tweet_dates = [datetime.strptime(obs, "%Y%m%d") for obs in tweet_dates]

In [7]:
# get idx of tweets that were posted in time frame
tweet_idx = [i for i in range(len(tweet_dates)) if (tweet_dates[i] >= datetime.strptime("2020-03-01", "%Y-%m-%d")) & 
                                                   (tweet_dates[i] < datetime.strptime("2021-10-01", "%Y-%m-%d"))]

In [8]:
# get tweet dates and files
tweet_files_period = [tweet_files[idx] for idx in tweet_idx]

In [9]:
# check
tweet_files_period

['./Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200306_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200309.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200312.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200312_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200316.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327_part_2.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200327_part_3.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200405.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200405_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200421.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200421_part_1.json',
 './Kickstarter_Big_Data_Assignment\\tweets_kck.st_20200429.json',
 './Kickstart

In [10]:
print(len(tweet_files_period))

861


In [11]:
# import twitter data
twitter_df = spark.read.json(tweet_files_period)

In [12]:
# check number of tweets
twitter_df.count()

825846

## 2.2. Parse json data

<font size="3">Next, we will parse the json files to obtain one row of useful features per tweet 

In [13]:
# check schema
twitter_df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

In [14]:
#welke id pakken we voor ontdubbellen?
#F.col("id").alias('Tweet_id')

In [15]:
# select interesting features
twitter_sub = twitter_df.select(F.col("created_at").alias("tweet_created_at"), 
                              F.col("full_text").alias("tweet_full_text"),
                              F.col("lang").alias("tweet_lang"),
                              F.col("retweet_count").alias("tweet_retweet_count"),
                              F.col("user.followers_count").alias('User_followers_count'),
                              F.col("user.friends_count").alias('User_friends_count'),
                              F.col("user.favourites_count").alias('User_favourites_count'),
                              F.col("user.name").alias('User_name'),
                              F.col("user.lang").alias('User_lang'),
                              F.col("user.screen_name").alias('User_screen_name'),
                              F.col("user.statuses_count").alias('User_status_count'),
                              F.col("user.verified").alias('User_verified'),
                              F.col("user.listed_count").alias('User_listed_count'),
                              F.col("entities.hashtags.text").alias('Tweet_hashtags'),          # entities is about tweet
                              F.col("entities.urls.expanded_url").alias('Tweet_expanded_url'),
                              F.col("entities.media.type").alias('Tweet_media_type'),
                              F.col("id_str").alias('Tweet_id'))  

In [16]:
# check
#print(twitter_sub.count())

In [17]:
twitter_sub.limit(5).toPandas()

Unnamed: 0,tweet_created_at,tweet_full_text,tweet_lang,tweet_retweet_count,User_followers_count,User_friends_count,User_favourites_count,User_name,User_lang,User_screen_name,User_status_count,User_verified,User_listed_count,Tweet_hashtags,Tweet_expanded_url,Tweet_media_type,Tweet_id
0,Wed Apr 14 12:06:11 +0000 2021,RT @EvrimMarriott: Our first Kickstarter proje...,en,1,59,438,1211,marrila,,marrila29,621,False,2,[],[],,1382304173266513921
1,Wed Apr 14 12:04:16 +0000 2021,RT @LimitedRunDoug: Back Ghosts! Just found ou...,en,10,15392,5218,5368,Dead Northern,,dead_northern,2489,False,27,[],[],,1382303689336049665
2,Wed Apr 14 12:03:12 +0000 2021,RT @SoftNotWeak: KICKSTARTER IS LIVE!\nhttps:/...,en,1311,397,865,54513,the drk knight hellzone™ 👀 patch 5.5,,mannerminded,19878,False,46,[],[http://kck.st/3a88qAc],[photo],1382303423954100228
3,Wed Apr 14 12:02:50 +0000 2021,TWO HOURS LEFT TO GET THE CTHULHU STRETCH GOAL...,en,0,435,184,2916,Whitney Delaglio,,widdledragon,1358,False,10,[],[http://kck.st/3993Tgk],,1382303328483352578
4,Wed Apr 14 12:00:23 +0000 2021,TWO HOURS LEFT TO GET THE CTHULHU STRETCH GOAL...,en,0,435,184,2916,Whitney Delaglio,,widdledragon,1358,False,10,[],[http://kck.st/3993Tgk],,1382302713598345221


## 2.3. Preprocess data

<font size="3">Now that the data is parsed, we can do some additional preprocessing.

In [18]:
# remove duplicates and retweets
 # filter out tweets that have no url since we can only use tweets with urls
twitter_processed = twitter_sub.filter(~F.col("full_text").startswith("RT")) \
                                .filter(F.col('Tweet_expanded_url')[0] != 'null') \
                               .drop_duplicates() \
                               .cache()

In [19]:
#print(twitter_processed.count())

# 3. Merge Data

<font size="3">Finally, we are going to merge the twitter data with the kickstarter data.
one way to merge both datasets is by the project url, since this url is almost always included in a tweet about a kickstarter project

In [20]:
# check twitter urls
twitter_urls = twitter_processed.toPandas()

In [21]:
twitter_urls["Tweet_expanded_url"] = twitter_urls["Tweet_expanded_url"].apply(lambda x: x[0])

In [22]:
twitter_urls["Tweet_expanded_url"].shape

(181886,)

In [23]:
len(twitter_urls["Tweet_expanded_url"].unique())

73109

What we can observe is that many url's actually appear multiple times. However, getting the long url actually takes quite some time, so this implies that we do a lot of useless waiting. And so what we will do is make a unique set with the small urls and we will only use these small urls to get the long urls. Aftwards we will use these long/short urls to merge back on to the twitter data to get the full url on all the tweets. 

In [24]:
import numpy as np
twitter_urls['small_urls'] = np.where(twitter_urls['Tweet_expanded_url'].str.contains('kck.st'),twitter_urls['Tweet_expanded_url'],np.nan)

In [25]:
all_twitter_urls = list(twitter_urls["small_urls"].dropna().drop_duplicates() )

In [26]:
print(len(all_twitter_urls))
print(len(set(all_twitter_urls)))

6657
6657


In [27]:
# inspect results
all_twitter_urls[:5]

['http://kck.st/3dW8J2j',
 'http://kck.st/3g74V0Q',
 'http://kck.st/2z2WQah',
 'http://kck.st/2WnlRbV',
 'http://kck.st/3mw4eBy']

We will expand the urls using the  function below.

In [28]:
# define function to extract expanded url
def resolve_url(url):
    
    try:
        r = requests.get(url)
    except requests.exceptions.RequestException:
        return (url, None)

    if r.status_code != 200:
        longurl = None
    else:
        longurl = r.url

    return (url, longurl)

In [29]:
twitter_expanded_urls = dict()
counter = 0

for i in range(len(all_twitter_urls)):
    counter += 1
    # add 10 seconds timer
    sleep(randint(0,5))
    if counter % 100 == 0:
        print("extracted %s urls, urls in dataframe: %s" %(counter, len(twitter_expanded_urls)))
    
    # get url
    url = all_twitter_urls[i]
    
    # check if url is shortened
    if "kck.st" in url:
        short, long = resolve_url(url)
        twitter_expanded_urls[short] = long

extracted 100 urls, urls in dataframe: 99
extracted 200 urls, urls in dataframe: 199
extracted 300 urls, urls in dataframe: 299
extracted 400 urls, urls in dataframe: 399
extracted 500 urls, urls in dataframe: 499
extracted 600 urls, urls in dataframe: 599
extracted 700 urls, urls in dataframe: 699
extracted 800 urls, urls in dataframe: 799
extracted 900 urls, urls in dataframe: 899
extracted 1000 urls, urls in dataframe: 999
extracted 1100 urls, urls in dataframe: 1099
extracted 1200 urls, urls in dataframe: 1199
extracted 1300 urls, urls in dataframe: 1299
extracted 1400 urls, urls in dataframe: 1399
extracted 1500 urls, urls in dataframe: 1499
extracted 1600 urls, urls in dataframe: 1599
extracted 1700 urls, urls in dataframe: 1699
extracted 1800 urls, urls in dataframe: 1799
extracted 1900 urls, urls in dataframe: 1899
extracted 2000 urls, urls in dataframe: 1999
extracted 2100 urls, urls in dataframe: 2099
extracted 2200 urls, urls in dataframe: 2199
extracted 2300 urls, urls in d

In [30]:
pd.DataFrame.from_dict(twitter_expanded_urls, orient='index', columns = ["Tweet_expanded_url"]).reset_index()

Unnamed: 0,index,Tweet_expanded_url
0,http://kck.st/3dW8J2j,https://www.kickstarter.com/projects/jaguarhug...
1,http://kck.st/3g74V0Q,https://www.kickstarter.com/projects/offintoth...
2,http://kck.st/2z2WQah,https://www.kickstarter.com/projects/hirukoa/s...
3,http://kck.st/2WnlRbV,https://www.kickstarter.com/projects/sugarmelo...
4,http://kck.st/3mw4eBy,https://www.kickstarter.com/projects/manriquez...
...,...,...
6651,http://kck.st/37amG90,https://www.kickstarter.com/projects/152483846...
6652,http://kck.st/3fGVKQZ,https://www.kickstarter.com/projects/mobygrip/...
6653,http://kck.st/33q57C4,https://www.kickstarter.com/projects/tailstory...
6654,http://kck.st/3hHnMP3,https://www.kickstarter.com/projects/silversid...


In [31]:
urls_tweets_df = pd.DataFrame.from_dict(twitter_expanded_urls, orient='index', columns = ["Long_url"]).reset_index().rename(columns = {"index":"Tweet_expanded_url"})

In [33]:
#urls_tweets_df.to_csv("urls_short_long.csv", sep = ';')

We can now merge back the short/long urls to the twitter dataset to get the correct urls per tweet.

In [34]:
twitter_urls.shape

(181886, 18)

In [35]:
tweets_output = twitter_urls.merge(urls_tweets_df,on ='Tweet_expanded_url', how = 'left')

We can now use this dataset to get the correct tweets, which we will do based on the null values in the textended urls

In [36]:
tweets_output["project_url"] = np.where(tweets_output['Long_url'].isnull(), tweets_output['Tweet_expanded_url'],tweets_output['Long_url'])

In [37]:
# drop columns we no longer need
tweets_output = tweets_output.drop(columns = ["Tweet_expanded_url","Long_url","small_urls"])

In [40]:
tweets_output.head()

Unnamed: 0,tweet_created_at,tweet_full_text,tweet_lang,tweet_retweet_count,User_followers_count,User_friends_count,User_favourites_count,User_name,User_lang,User_screen_name,User_status_count,User_verified,User_listed_count,Tweet_hashtags,Tweet_media_type,Tweet_id,small_urls,project_url
0,Tue Apr 13 23:15:25 +0000 2021,Find Brixes at Kickstarter now!!\nhttps://t.co...,en,1,24,200,15,Hugo Corona,,hugocrown,858,False,0,"[boardgames, alebrijes, mexico, PokemonGOCommu...",[photo],1382110201847894020,http://kck.st/3dW8J2j,https://www.kickstarter.com/projects/jaguarhug...
1,Thu Apr 15 03:50:07 +0000 2021,Hi night crowd my name is brentt harshman and ...,en,26,2701,3384,132678,brentt harshman,,BrenttHarshman,66985,False,40,[],,1382541721263742978,http://kck.st/3g74V0Q,https://www.kickstarter.com/projects/offintoth...
2,Tue Jul 07 08:44:23 +0000 2020,Recordad que han pasado días pero la primera a...,es,3,109,258,607,Neophenix,,neophenix007,444,False,1,[SuperHighSchool],,1280422397355724800,http://kck.st/2z2WQah,https://www.kickstarter.com/projects/hirukoa/s...
3,Sun Apr 19 17:56:19 +0000 2020,Hey friendos!! If you back the @kickstarter fo...,en,4,829,1581,16909,Jim Ousley (Butcher Queen 2 - Kickstarter 4.14...,,JimOusley,4355,False,43,"[makecomics, crowdfunding, comics]",,1251932655827943426,,https://www.kickstarter.com/projects/butcherqu...
4,Wed Aug 25 09:47:43 +0000 2021,Hey Lovelies! My Kickstarter for the Teyvat To...,en,0,52,290,4154,Kousu,,Kousubae,751,False,0,[GenshinImpact],[photo],1430466911968583680,http://kck.st/2WnlRbV,https://www.kickstarter.com/projects/sugarmelo...


In [38]:
tweets_output.to_csv("tweets_data.csv", sep = ';')