In [0]:
# covid 19 tweet pre processing code
# code authors: rajdeep and jeevan

from functools import partial
import pycountry
from pyspark.sql import SparkSession
import re
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.sql.functions import split, col, concat_ws
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
from pyspark.ml.feature import StopWordsRemover

# set spark conf
spark.conf.set(
  "")
spark.conf.set(
  "")

# remove unwanted values from text
def process_text (string):
    string = F.lower(string)
    string = F.regexp_replace(string, "(https?\://)\S+", "") # for links
    string = F.regexp_replace(string, "(\\n)|\n|\r|\t", "") # for CR, tab, and LR
    string = F.regexp_replace(string, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # for dates
    string = F.regexp_replace(string, "@([A-Za-z0-9_]+)", "") # for usernames
    string = F.regexp_replace(string, "[0-9]", "") # for numbers
    string = F.regexp_replace(string, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # for symbols
    return string

# match location of user
def add_true_location(user_location):
  for country in pycountry.countries:
    if country.name.upper() in user_location.upper():
      return country.name
  return ''

# extract date
def to_date(array):
    date = array[1]+array[2]+"2020"
    return date  

# udf
udf_func = F.udf(add_true_location)
udf_func2 = F.udf(to_date)
spark.udf.register("udf_func", udf_func)
spark.udf.register("udf_func2", udf_func2)  

# read in stopwords txt file 
stopWordsDf = spark.read.text("")
# convert dff to array/list : https://mungingdata.com/pyspark/column-to-list-collect-tolocaliterator/
stopWords = [row[0] for row in stopWordsDf.select('value').collect()]
# remove stop words from "full_text" col and insert them into a new column "filtered"
remover = StopWordsRemover().setStopWords(stopWords).setInputCol("full_text").setOutputCol("filtered")


# configure which month and days to pre proccess
# month = feb, mar or apr
# month number = 2, 3, or 4
startDay = 1
endDay = 31
month = "mar"
monthNumber = "3"

# for each day ...
for i in range(startDay, endDay+1):
  
  # read in file
  filename = 'geo_2020-0'+ monthNumber + '-' + str(i) + '.jsonl'
  df = spark.read.format("json").load("wasbs://"+month+"@sentimentdatacs4225.blob.core.windows.net/"+filename)
  
  df = df.select(
      col("created_at"),
      col("full_text"),
      col("lang"),
      (col("user")["location"]).alias("user_location")
  )
  
  # clean the text
  df = df.withColumn("full_text", process_text(col("full_text")))

  # convert "full_text" column into array/list
  df = df.withColumn("full_text", split(col("full_text"), " "))

  # find country from full_address 
  df = df.withColumn('full_address', udf_func(df.user_location))
  df = df.filter(df.full_address != '')

  # convert dates 
  df = df.withColumn("created_at", split(col("created_at"), "\\ ").alias("created_at"))
  df = df.withColumn('created_at', udf_func2(df.created_at))
  df = df.withColumn('created_at', F.to_date(df.created_at, "MMMddyyyy"))
  
  # remove stopwords
  # array of string to string: https://sparkbyexamples.com/pyspark/pyspark-convert-array-column-to-string-column/ and drop "full_text" column
  df = remover.transform(df)
  df = df.withColumn("filtered", concat_ws(" ", col("filtered")))
  df = df.drop("full_text")

  # select required fields, filter and write to file
  df = df.select('created_at', 'lang', 'full_address', 'filtered')
  countries = ["India", "Hong Kong", "United States", "Canada", "United Kingdom", "Malaysia", "Singapore", "France", "Philippines", "Thailand", "Japan", "China", "Germany", "Russia", "Sweden"]
  df = df.filter(df.lang == "en").filter(df.full_address.isin(countries))
#   df.groupBy('full_address').count().sort('count', ascending=False).show()
  df.write.format("csv").save("wasbs://"+month+"@sentimentdatacs4225.blob.core.windows.net/"+month+"-" + str(i) + ".csv")
  print("completed: "+ month + str(i))