In [None]:
# Importing the libraries needed
import pandas as pd
import numpy as np
import json
import scipy 
import torch
from torch.utils.data import Dataset, DataLoader
import logging
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 14.2 kB/88.7                                                                                Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 88.7 kB/88.7                                                                                Hit:3 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
                                                                               Hit:4 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (18.65.39.68)] [Wait                                                                               Hit:5 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
                                          

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-08-29 06:45:02--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.4’


2022-08-29 06:45:03 (1.59 MB/s) - ‘postgresql-42.2.16.jar.4’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("UA_War").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
#import data from aws s3 bucket for 08-18-22. This will be the starting dataframe to populate DB

from pyspark import SparkFiles
url = "https://databootcamps3bucket.s3.us-west-2.amazonaws.com/ua_war/UkraineWar/0818_UATweets.csv.gz"
spark.sparkContext.addFile(url)

df_8018 = spark.read.option("delimiter", ",").option("encoding", "UTF-8").option("multiLine", True).option("escape", '"').csv(SparkFiles.get("0818_UATweets.csv.gz"),  header=True, inferSchema=True)

# df.show(truncate=False) 
df_8018_2 = df_8018.na.fill("<empty>")

df_8018_2.show(5)

+---+-------------------+---------------+--------------------+------------------+---------+---------+-----------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+--------+-----------+--------------+----------+-----------------+---------------------+-----------------------+---------------------+-------------------+-----------------------+---------------+----------------+--------------------+----------------------+--------------------+
|_c0|             userid|       username|            acctdesc|          location|following|followers|totaltweets|       usercreatedts|            tweetid|     tweetcreatedts|retweetcount|                text|            hashtags|language|coordinates|favorite_count|is_retweet|original_tweet_id|original_tweet_userid|original_tweet_username|in_reply_to_status_id|in_reply_to_user_id|in_reply_to_screen_name|is_quote_status|quoted_status_id|quoted_status_userid|quoted_status_username|         ext

In [None]:
#see what the datatypes for df_8018 
df_8018_2.dtypes

[('_c0', 'int'),
 ('userid', 'bigint'),
 ('username', 'string'),
 ('acctdesc', 'string'),
 ('location', 'string'),
 ('following', 'int'),
 ('followers', 'int'),
 ('totaltweets', 'int'),
 ('usercreatedts', 'string'),
 ('tweetid', 'bigint'),
 ('tweetcreatedts', 'string'),
 ('retweetcount', 'int'),
 ('text', 'string'),
 ('hashtags', 'string'),
 ('language', 'string'),
 ('coordinates', 'string'),
 ('favorite_count', 'int'),
 ('is_retweet', 'boolean'),
 ('original_tweet_id', 'int'),
 ('original_tweet_userid', 'int'),
 ('original_tweet_username', 'string'),
 ('in_reply_to_status_id', 'bigint'),
 ('in_reply_to_user_id', 'bigint'),
 ('in_reply_to_screen_name', 'string'),
 ('is_quote_status', 'boolean'),
 ('quoted_status_id', 'bigint'),
 ('quoted_status_userid', 'bigint'),
 ('quoted_status_username', 'string'),
 ('extractedts', 'string')]

In [None]:
#use this to determine which type of drop is needed. Also not all all dates have same number of columns 
len(df_8018_2.columns)

29

In [None]:
from pyspark.sql.functions import *

#1. Drop Uncessary Columns using the data set of 08-18-2022 as a starting point. 
cleaned_df = df_8018_2.drop("userid", "_c0", "acctdesc", "location", "tweetid", "coordinates", "original_tweet_id", "original_tweet_userid", "original_tweet_username", "in_reply_to_status_id", "in_reply_to_screen_name", 'in_reply_to_user_id', "quoted_status_id", "quoted_status_username", "quoted_status_userid")

#2. change columns from string to year:month:day date format
cleaned_df = cleaned_df.withColumn("usercreatedts",to_date("usercreatedts"))
cleaned_df = cleaned_df.withColumn("tweetcreatedts",to_date("tweetcreatedts"))
cleaned_df = cleaned_df.withColumn("extractedts",to_date("extractedts"))

#3. filter out langauge = english only
cleaned_df = cleaned_df.filter(cleaned_df["language"]=="en")
cleaned_df = cleaned_df.filter(cleaned_df["usercreatedts"] >= "2009-01-01")

cleaned_df.show(5)

+---------------+---------+---------+-----------+-------------+--------------+------------+--------------------+--------------------+--------+--------------+----------+---------------+-----------+
|       username|following|followers|totaltweets|usercreatedts|tweetcreatedts|retweetcount|                text|            hashtags|language|favorite_count|is_retweet|is_quote_status|extractedts|
+---------------+---------+---------+-----------+-------------+--------------+------------+--------------------+--------------------+--------+--------------+----------+---------------+-----------+
|DogandCatHelpe1|        5|       39|        685|   2021-02-02|    2022-08-18|           1|Welcome to our sh...|[{'text': 'Ukrain...|      en|             3|     false|          false| 2022-08-18|
|        orfecon|       27|     3071|      15040|   2017-01-06|    2022-08-18|           0|Will the #sanctio...|[{'text': 'sancti...|      en|             0|     false|          false| 2022-08-18|
|    cgtnameric

In [None]:
cleaned_df_count = cleaned_df.count()
cleaned_df_count

22451

In [2]:
all_dates_arr = ["0224", "0227", "0228p1", "0228p2", "0301", "0302", "0303", "0304", "0305", "0306", "0307", "0308", "0309","0310", "0311", "0312", "0313", "0314", "0315", "0316", "0317", "0318", "0319", "0320", "0321", "0322", "0323", "0324", "0325", "0326", "0327", "0328", "0329", "0330", "0331",
 "0401", "0402", "0403", "0404", "0405", "0406", "0407", "0408", "0409","0410", "0411", "0412", "0413", "0414", "0415", "0416", "0417", "0418", "0419", "0420", "0421", "0422", "0423", "0424", "0425", "0426", "0427", "0428", "0429", "0430",
 "0501", "0502", "0503", "0504", "0505", "0506", "0507", "0508", "0509","0510", "0511", "0512", "0513", "0514", "0515", "0516", "0517", "0518", "0519", "0520", "0521", "0522", "0523", "0524", "0525", "0526", "0527", "0528", "0529", "0530", "0531",
 "0601", "0602", "0603", "0604", "0605", "0606", "0607", "0608", "0609","0610", "0611", "0612", "0613", "0614", "0615", "0616", "0617", "0618", "0619", "0620", "0621", "0622", "0623", "0624", "0625", "0626", "0627", "0628", "0629", "0630",
 "0701", "0702", "0703", "0704", "0705", "0706", "0707", "0708", "0709","0710", "0711", "0712", "0713", "0714", "0715", "0716", "0717", "0718", "0719", "0720", "0721", "0722", "0723", "0724", "0725", "0726", "0727", "0728", "0729", "0730", "0731",
 "0801", "0802", "0803", "0804", "0805", "0806", "0807", "0808", "0809","0810", "0811", "0812", "0813", "0814", "0815", "0816", "0817", "0818"]

In [None]:
from pyspark import SparkFiles
from pyspark.sql.functions import *

url = "https://databootcamps3bucket.s3.us-west-2.amazonaws.com/ua_war/UkraineWar"

for day in all_dates_arr:
      #load the data from aws
      aws_url = f"{url}/{day}_UATweets.csv.gz"
      spark.sparkContext.addFile(aws_url)
      temp_df = spark.read.option("delimiter", ",").option("encoding", "UTF-8").option("multiLine", True).option("escape", '"').csv(SparkFiles.get(f"{day}_UATweets.csv.gz"),  header=True, inferSchema=True)
      
      #keep count number of columns to determine which if else block it will hit
      temp_count = len(temp_df.columns)

      #change columns from string to year:month:day date format
      temp_df = temp_df.withColumn("usercreatedts",to_date("usercreatedts"))
      temp_df = temp_df.withColumn("tweetcreatedts",to_date("tweetcreatedts"))
      temp_df = temp_df.withColumn("extractedts",to_date("extractedts"))

      #filter out data for english only 
      temp_df = temp_df.filter(temp_df["language"]=="en")
      #filter out usercreated after 2009 
      temp_df = temp_df.filter(temp_df["usercreatedts"] >= "2009-01-01")

      #fill in null values 
      temp_df_2 = temp_df.na.fill("<empty>")

      #some days the data columns has less columns then other days 
      if temp_count == 18:
        temp_df_3 = temp_df_2.drop("userid", "_c0", "acctdesc", "location", "tweetid", "coordinates")
        temp_df_3 = temp_df_3.withColumn("is_retweet", lit(None).cast('boolean')) 
        temp_df_3 = temp_df_3.withColumn("is_quote_status", lit(None).cast('boolean')) 

      elif temp_count == 29:
        temp_df_3 = temp_df_2.drop("userid", "_c0", "acctdesc", "location", "tweetid", "coordinates", "original_tweet_id", "original_tweet_userid", "original_tweet_username", "in_reply_to_status_id", "in_reply_to_screen_name", 'in_reply_to_user_id', "quoted_status_id", "quoted_status_username", "quoted_status_userid")

      else:
        print(f"Error on {day}_UATweets.csv.gz and column count {temp_count}")

      cleaned_df = cleaned_df.unionByName(temp_df_3)
      print(cleaned_df.count())


731199
953482
1201309
1310535


In [None]:
cleaned_df.show(5)

+---------------+---------+---------+-----------+-------------+--------------+------------+--------------------+--------------------+--------+--------------+----------+---------------+-----------+
|       username|following|followers|totaltweets|usercreatedts|tweetcreatedts|retweetcount|                text|            hashtags|language|favorite_count|is_retweet|is_quote_status|extractedts|
+---------------+---------+---------+-----------+-------------+--------------+------------+--------------------+--------------------+--------+--------------+----------+---------------+-----------+
|DogandCatHelpe1|        5|       39|        685|   2021-02-02|    2022-08-18|           1|Welcome to our sh...|[{'text': 'Ukrain...|      en|             3|     false|          false| 2022-08-18|
|        orfecon|       27|     3071|      15040|   2017-01-06|    2022-08-18|           0|Will the #sanctio...|[{'text': 'sancti...|      en|             0|     false|          false| 2022-08-18|
|    cgtnameric

In [None]:
#add week column for "tweetcreatedts"
cleaned_df = cleaned_df.withColumn("week",weekofyear("tweetcreatedts"))

In [None]:
cleaned_df.show(5)

+---------------+---------+---------+-----------+-------------+--------------+------------+--------------------+--------------------+--------+--------------+----------+---------------+-----------+------------------+
|       username|following|followers|totaltweets|usercreatedts|tweetcreatedts|retweetcount|                text|            hashtags|language|favorite_count|is_retweet|is_quote_status|extractedts|tweet_created_week|
+---------------+---------+---------+-----------+-------------+--------------+------------+--------------------+--------------------+--------+--------------+----------+---------------+-----------+------------------+
|DogandCatHelpe1|        5|       39|        685|   2021-02-02|    2022-08-18|           1|Welcome to our sh...|[{'text': 'Ukrain...|      en|             3|     false|          false| 2022-08-18|                33|
|        orfecon|       27|     3071|      15040|   2017-01-06|    2022-08-18|           0|Will the #sanctio...|[{'text': 'sancti...|   

In [None]:
cleaned_df.dtypes

[('username', 'string'),
 ('following', 'int'),
 ('followers', 'int'),
 ('totaltweets', 'int'),
 ('usercreatedts', 'date'),
 ('tweetcreatedts', 'date'),
 ('retweetcount', 'int'),
 ('text', 'string'),
 ('hashtags', 'string'),
 ('language', 'string'),
 ('favorite_count', 'int'),
 ('is_retweet', 'boolean'),
 ('is_quote_status', 'boolean'),
 ('extractedts', 'date'),
 ('tweet_created_week', 'int')]

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [None]:
# Store environmental variable
from getpass import getpass
password = getpass('Enter database password')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://tweets.cnzbbvrrhst7.us-west-1.rds.amazonaws.com:5432/ua_data"
config = {"user":"uatweets", 
          "password": password, 
          "driver":"org.postgresql.Driver"}


Enter database password··········


In [None]:
# Write review_id_df to table in RDS
cleaned_df.write.jdbc(url=jdbc_url, table='tweets_table', mode=mode, properties=config)