In [1]:
import re
import math
import string
from unidecode import unidecode
from nltk.corpus import stopwords

import numpy as np
import pandas as pd
from scipy import stats
import plotly.express as px
from datetime import datetime
import matplotlib.pyplot as plt

from pyspark import SparkConf
from pyspark.sql import SparkSession # Spark SQL
from pyspark.sql.functions import date_format, to_date
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, TimestampType

import pyspark.sql.functions as F
from pyspark.sql.functions import udf

master = "local[10]"
app_name = "Parallel Join"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)#.set("spark.executor.memory", "6g")
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

pd.set_option('display.max_colwidth', None)
stopwords = stopwords.words('english')

23/08/07 19:46:03 WARN Utils: Your hostname, crarojasca-Blade-14-RZ09-0370 resolves to a loopback address: 127.0.1.1; using 192.168.1.9 instead (on interface wlp2s0)
23/08/07 19:46:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/07 19:46:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
file = "../datasets/hamburg/omm_export_tweets_01-06-2022_v2.csv"
data = spark.read.options(delimiter="\t").csv(file, header=True)
data.printSchema()

schema = StructType([
    StructField("id_cards", StringType(), True),
    StructField("cards_pred", StringType(), True),
    StructField("cards_pred_score", FloatType(), True)])

file = "../datasets/predictions"
predictions = spark.read.options(delimiter="|").csv(file, header=False, schema=schema)
predictions.printSchema()

schema = StructType([
    StructField("id_waterloo_cards", StringType(), True),
    StructField("waterloo_cards_pred", StringType(), True),
    StructField("waterloo_cards_score", FloatType(), True)])

file = "../datasets/predictions_waterloo_cards"
new_predictions = spark.read.options(delimiter="|").csv(file, header=False, schema=schema)
new_predictions.printSchema()

schema = StructType([
    StructField("id_hwaterloo_cards", StringType(), True),
    StructField("hwaterloo_cards_pred", StringType(), True),
    StructField("hwaterloo_cards_score", FloatType(), True)])

file = "../datasets/hamburg/hamburg_secondlvl_predictions"
secondlvl_predictions = spark.read.options(delimiter="|").csv(file, header=False, schema=schema)
secondlvl_predictions.printSchema()

data = data.join(
    predictions, data.id ==  predictions.id_cards, "inner"
)

data = data.join(
    new_predictions, data.id ==  new_predictions.id_waterloo_cards, "inner"
)

data = data.join(
    secondlvl_predictions, data.id ==  secondlvl_predictions.id_hwaterloo_cards, "left"
)

data = data.drop(
    "id_cards", "id_waterloo_cards", "id_hwaterloo_cards"
)

def proccess(value):
    categories = ['1_1', '1_2', '1_3', '1_4', '1_6', '1_7', '2_1', '2_3',
       '3_1', '3_2', '3_3', '4_1', '4_2', '4_4', '4_5', '5_1', '5_2']
    if value:
        return categories[int(value)]
    return "0_0"


f = udf(proccess)
data = data.withColumn("hwaterloo_cards_pred", f("hwaterloo_cards_pred"))

@udf(returnType=TimestampType())
def generateDate(year, month, day):
    if not year or not month or not day:
        return
    try:
        date = year + "-" + month + "-" + day
        return datetime.strptime(date,"%Y-%m-%d") 
    except:
        return

data = (
    data.withColumn("year", data.year.cast(StringType()))
    .withColumn("month", data.month.cast(StringType()))
    .withColumn("day", data.day.cast(StringType()))
)

data = (
    data.withColumn("date", generateDate(data.year, data.month, data.day))
)

data = data.withColumn("date", date_format("date", "yyyy-MM-dd HH:mm:ss"))

data = data.dropDuplicates(['id', 'fulltext'])

root
 |-- id: string (nullable = true)
 |-- username: string (nullable = true)
 |-- fulltext: string (nullable = true)
 |-- 140_char_text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- url: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- favorite_count: string (nullable = true)
 |-- language: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_time_zone: string (nullable = true)
 |-- user_follower_count: string (nullable = true)
 |-- user_favorite_count: string (nullable = true)
 |-- user_tweet_count: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- media: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: 

In [3]:
sample_labeled_1 = pd.read_csv("../datasets/hamburg/hamburg_1000_sample.csv")
sample_labeled_2 = pd.read_csv("../datasets/hamburg/John_hamburg_test_set2.csv")
sample_labeled_2 = sample_labeled_2.rename(columns={"fulltext_o":"fulltext"})

sample_tagged = pd.concat([sample_labeled_1, sample_labeled_2])
sample_tagged.shape

(2004, 40)

In [4]:
counts = data.withColumn("hwaterloo_cards_pred", \
                         F.when(F.col("hwaterloo_cards_pred")==None , "0_0") \
                        .otherwise(F.col("hwaterloo_cards_pred"))) \
                        .groupby("hwaterloo_cards_pred") \
                        .agg(F.count(F.col("id")).alias("count")).toPandas()
counts = counts.sort_values("hwaterloo_cards_pred").set_index("hwaterloo_cards_pred")
counts = counts.iloc[1:,:]
counts["perc"] = counts["count"]/counts["count"].sum()
counts["nsamples"] = (counts["perc"]*1000).apply(math.ceil)

                                                                                

In [5]:
counts

Unnamed: 0_level_0,count,perc,nsamples
hwaterloo_cards_pred,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1_1,2546,0.003094,4
1_2,5894,0.007162,8
1_3,8633,0.01049,11
1_4,6381,0.007753,8
1_6,3549,0.004312,5
1_7,35170,0.042735,43
2_1,69923,0.084962,85
2_3,21928,0.026644,27
3_1,777,0.000944,1
3_2,9386,0.011405,12


In [12]:
sample2tag = pd.DataFrame()
n = 50
for label in counts.index:
#     n = counts.loc[label, "nsamples"]
    total = counts.loc[label, "count"]
    prop = n*50/total
    prop = prop if prop<=1 else 1.0
    sample = (data.filter(
        F.col("hwaterloo_cards_pred")==label)
        .sample(0.2)
        .limit(5*n)
        .toPandas()
    )    
#     sample = sample[]
    sample = sample[~sample["fulltext"].isin(sample_tagged.fulltext)][:n]
    print(label, sample.shape)
    sample2tag = pd.concat([sample2tag, sample])

                                                                                

1_1 (50, 32)


                                                                                

1_2 (50, 32)


                                                                                

1_3 (50, 32)


                                                                                

1_4 (50, 32)


                                                                                

1_6 (50, 32)




23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
23/08/07 20:30:36 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.


                                                                                

1_7 (50, 32)


                                                                                

2_1 (50, 32)


                                                                                

2_3 (50, 32)


                                                                                

3_1 (50, 32)


                                                                                

3_2 (50, 32)


                                                                                

3_3 (50, 32)


                                                                                

4_1 (50, 32)


                                                                                

4_2 (50, 32)


                                                                                

4_4 (50, 32)


                                                                                

4_5 (50, 32)


                                                                                

5_1 (50, 32)




5_2 (50, 32)




In [14]:
sample2tag.shape

(850, 32)

In [17]:
sample2tag.drop_duplicates("fulltext").shape

(850, 32)



In [18]:
sample2tag[["id", "fulltext"]].to_csv("sample_categories3.csv", index=False)



In [19]:
sample = (data.filter(
        F.col("hwaterloo_cards_pred")=="0_0")
        .sample(0.2)
        .limit(150)
        .toPandas()
    )   



In [21]:
sample.shape

(150, 32)

In [22]:
sample2tag = pd.concat([sample2tag, sample])

In [28]:
sample2tag[["id", "fulltext"]].sample(frac = 1).to_csv("sample_categories3.csv", index=False)

