# Analisi del consenso sul Bitcoin

Sei stato reclutato da un'azienda di ricerche di mercato per stimare il consenso delle persone verso il Bitcoin. Un team di data engineer ha estratto diversi milioni di tweet che parlano di Bitcoin, il tuo compito è quello di eseguire un'analisi del sentiment e creare un grafico che mostri come questo è variato giorno per giorno. Utilizza anche i dati per rispondere a domande interessanti, come:
- i tweet negativi hanno avuto più likes rispetto a quelli positivi?
- i tweet negativi hanno avuto più interazioni (risposte) rispetto a quelli positivi?

#### BONUS
Verifica se la variazione del sentiment è associata ad una variazione del valore del Bitcoin, per svolgere questa task devi reperire in autonomia lo storico di BTC USD

Lo svolgimento di questo progetto comincia con l'installazione delle librerie che utilizzeremo nel codice. La prima parte di codice compilato consiste nel scaricare da link il dataset che utilizzeremo, effettuare una pulizia dell'head delle colonne e dello stesso dataset, eliminando dati che non servono alle analisi quali ad esempio virgole, punti, simboli ed emoji. Successivamente viene creato il datafrake spark, in quanto nella compilazione di questo codice sarà preferito ove possibile utilizzare Apache spark, e successivamente creare una Table che verrà caricata nell'ambiente Databricks, il cui nome è bitcoin_tweet



In [0]:
pip install py spark

In [0]:
pip install pandas

In [0]:
pip install TextBlob

In [0]:
pip install emoji

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import re
import emoji

spark = SparkSession.builder.getOrCreate()

!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/bitcoin_tweets.csv

import pandas as pd
dataset = pd.read_csv('bitcoin_tweets.csv', delimiter=";", low_memory=False)

# Pulizia dei nomi delle colonne
new_column_names = [col.replace(' ', '_').replace(',', '_').replace(';', '_').replace('{', '_').replace('}', '_')
                    .replace('(', '_').replace(')', '_').replace('\n', '_').replace('\t', '_').replace('=', '_')
                    for col in dataset.columns]
dataset.columns = new_column_names

def remove_emoji(text):
    if isinstance(text, str):
        emoji_pattern = re.compile("["
                                   u"\U0001F600-\U0001F64F"  # emoticon
                                   u"\U0001F300-\U0001F5FF"  # simboli & pittogrammi
                                   u"\U0001F680-\U0001F6FF"  # trasporti & simboli mappe
                                   u"\U0001F1E0-\U0001F1FF"  # bandiere (iOS)
                                   u"\U00002702-\U000027B0"
                                   u"\U000024C2-\U0001F251"
                                   "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)
    else:
        return text  # Restituisce l'input se non è una stringa


# Funzione per pulire il testo
def clean_text(text):
    if text is not None:
        text = remove_emoji(text)
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    return text

# Crea il DataFrame Spark
spark_df = spark.createDataFrame(dataset)

# Applica la pulizia del testo alla colonna "Text"
clean_text_udf = F.udf(clean_text)
spark_df = spark_df.withColumn("Text", clean_text_udf(spark_df["Text"]))

# Salva il DataFrame pulito come tabella
spark_df.write.saveAsTable("bitcoin_tweets", mode="overwrite")

Creiamo una funzione Python analyze_sentiment_udf che prende un testo come input, utilizza TextBlob per analizzare il sentimento di quel testo e restituisce una stringa rappresentante il sentiment (Positivo, Negativo o Neutro), e utilizziamo il metodo udf per registrare la funzione analyze_sentiment_udf, infine verrà aggiunta la colonna Sentiment al Dataframe che contiene i 3 tipi di sentimento elencati prima, e un ulteriore modifica al dataframe eliminando valori nulli nel sentiment, nel text e cambiando il nome della colonna Timestamp ,che contiene le date, che chiameremo Data e perfezioneremo il contenuto rimuovendo l'orario, dato poco utile all'analisi.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from textblob import TextBlob


# Definiamo una funzione per l'analisi del sentiment utilizzando TextBlob
def analyze_sentiment_udf(text):
    if text is not None and isinstance(text, str):
        analysis = TextBlob(text)
        sentiment_score = analysis.sentiment.polarity
        if sentiment_score > 0:
            return "Positive"
        elif sentiment_score < 0:
            return "Negative"
        else:
            return "Neutral"
    else:
        return None

# Creiamo una User Defined Function (UDF) per l'analisi del sentiment
analyze_sentiment = udf(analyze_sentiment_udf, StringType())

# Aggiungere la colonna 'sentiment' al DataFrame
spark_df = spark_df.withColumn('sentiment', analyze_sentiment(spark_df['text']))

In [0]:
from pyspark.sql.functions import when

# Sostituisce i valori nulli nella colonna "sentiment" con una stringa vuota
spark_df = spark_df.withColumn("sentiment", when(spark_df["sentiment"].isNull(), "").otherwise(spark_df["sentiment"]))

In [0]:
from pyspark.sql.functions import col
# Filtra i record con valori nulli nella colonna "text"
filtered_df = spark_df.filter(col("text").isNotNull())

In [0]:
from pyspark.sql.functions import date_format

# Aggiunge una nuova colonna "Date" con solo la data, senza l'orario
filtered_df = filtered_df.withColumn("Date", date_format(col("timestamp"), "yyyy-MM-dd"))

# Rimuove la colonna "timestamp" se non è più necessaria
filtered_df = filtered_df.drop("timestamp")

filtered_df.show(2)

Ora ci interessiamo a eseguire un'analisi del sentiment e creare un grafico che mostri come questo è variato giorno per giorno, Per rendere il grafico visualizzabile ci concentreremo su un lasso di tempo di 15 giorni del mese di  Gennaio del 2017 , dal grafico risultante osserviamo, un sentimento Neutral che varia molto, ma la più utilizzata dagli utenti basandoci sui numeri. Una costanza nei tweet negativi, il cui numerio è comunque contenuto, e una linea positiva che varia abbastanza in base alle giornate, superando in ogni caso i numeri dei tweet negativi ma mantenendo una soglia lontana dai numeri del Neutral.

In [0]:
from pyspark.sql import functions as F

# Filtra i dati relativi ai primi 15 giorni di gennaio 2017
start_date = "2017-01-01"
end_date = "2017-01-15"
filtered_df_jan_2017 = filtered_df.filter((F.col("Date") >= start_date) & (F.col("Date") <= end_date))

# Raggruppa per giorno e sentimento e conta il numero di tweet per ogni combinazione di giorno e sentimento
sentiment_counts_jan_2017 = filtered_df_jan_2017.groupBy("Date", "sentiment").count()

# Calcolo della somma dei tweet positivi, negativi e neutri per ogni giorno
positive_counts = sentiment_counts_jan_2017.filter(sentiment_counts_jan_2017.sentiment == "Positive") \
                                           .withColumnRenamed("count", "Positive_Count")
negative_counts = sentiment_counts_jan_2017.filter(sentiment_counts_jan_2017.sentiment == "Negative") \
                                           .withColumnRenamed("count", "Negative_Count")
neutral_counts = sentiment_counts_jan_2017.filter(sentiment_counts_jan_2017.sentiment == "Neutral") \
                                           .withColumnRenamed("count", "Neutral_Count")

# Unione conteggi positivi, negativi e neutri
merged_counts = positive_counts.join(negative_counts, "Date", "outer") \
                               .join(neutral_counts, "Date", "outer").na.fill(0)

# Calcolo della variazione del sentiment per ogni giorno
merged_counts = merged_counts.withColumn("Sentiment_Variation", \
                                         (F.col("Positive_Count") - F.col("Negative_Count")) / \
                                         (F.col("Positive_Count") + F.col("Negative_Count") + F.coalesce(F.col("Neutral_Count"), F.lit(0))))




In [0]:
import matplotlib.pyplot as plt

merged_counts_pd = merged_counts.toPandas()

plt.figure(figsize=(10, 6))

plt.plot(merged_counts_pd["Date"], merged_counts_pd["Positive_Count"], marker='o', linestyle='-', label="Positive", color='blue')

plt.plot(merged_counts_pd["Date"], merged_counts_pd["Negative_Count"], marker='o', linestyle='-', label="Negative", color='red')

plt.plot(merged_counts_pd["Date"], merged_counts_pd["Neutral_Count"], marker='o', linestyle='-', label="Neutral", color='green')

plt.xticks(fontsize=8, rotation=45, color='gray')

plt.title("Tweets per Day in January 2017 by Sentiment")
plt.xlabel("Date")
plt.ylabel("Tweet Count")
plt.grid(True)
plt.legend()
plt.tight_layout()

plt.show()

Successivamente facciamo un'indagine per vedere se i tweet negativi hanno avuto più likes rispetto a quelli positivi, osserveremo dalla table ottenuta che vi è una differenza sostanziale in quanto tweet positivi sono molti di più dei tweet negativi.

In [0]:
from pyspark.sql.functions import col, sum

# Selezione delle colonne "sentiment" e "likes" dal DataFrame filtrato
sent_likes = filtered_df.select("sentiment", "likes")

# Raggruppamento per "sentiment" e calcolo della somma dei "likes"
sum_likes = sent_likes.groupBy("sentiment").agg(sum("likes").alias("sum_likes"))

# Visualizzazione del DataFrame risultante
display(sum_likes)


Altra indagine è quella di occuparci di vedere se tweet negativi hanno avuto più interazioni (risposte) rispetto a quelli positivi, approccio simile ai likes, evidenzia una differenza evidente fra le varie tipologie di sentimento,con una prevalenza di Neutral, in ogni caso le "replies" Positive superano quelle Negative.

In [0]:
# Selezione delle colonne "sentiment" e "replies" dal DataFrame filtrato
sent_replies = filtered_df.select("sentiment", "replies")

# Raggruppamento per "sentiment" e calcolo della somma delle "replies"
sum_replies = sent_replies.groupBy("sentiment").agg(sum("replies").alias("sum_replies"))

# Visualizzazione del DataFrame risultante
display(sum_replies)


Infine ci occupiamo di verificare se la variazione del sentiment è associata ad una variazione del valore del Bitcoin, e lo illustriamo in un grafico. Per prima cosa creiamo un dataframes spark dal csv fonte, facciamo una breve pulizia rinominando le colonne e togliendo il punto, la percentuale e la lettera K da Volume, uniamo infine il dataframe filterd_df a bitcoin_df; a questo punto calcoliamo la media giornaliera per ogni sentiment tenendo conto che come arco temporale utilizzeremo solo Gennaio 2017 per rendere il grafico leggibile. Visualizzeremo che il sentimento positivo è associato a una variazione positiva del Bitcoin, mentre il sentimento negativo è associato a una variazione negativa del Bitcoin.Tuttavia, ci sono alcune eccezioni a questa regola. Ad esempio, il 17 gennaio il sentimento medio era negativo, ma la variazione del Bitcoin era positiva. Questo può essere dovuto ad altri fattori, come le notizie o gli eventi che hanno avuto luogo quel giorno.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, regexp_replace, to_date

bitcoin_df = spark.table("default.bitcoin_dati_storici_csv")

bitcoin_df.show(2)

In [0]:

bitcoin_df = bitcoin_df.withColumn("Data", to_date("Data", "dd.MM.yyyy"))

bitcoin_df = bitcoin_df.withColumnRenamed("Vol.", "Volume").withColumnRenamed("Var. %", "Variazione")

bitcoin_df = bitcoin_df.withColumn("Volume", regexp_replace(bitcoin_df["Volume"], "K", ""))

bitcoin_df = bitcoin_df.withColumn("Variazione", regexp_replace(bitcoin_df["Variazione"], "%", ""))


In [0]:

# Rinominiamo la colonna "Data" in "Date" nel DataFrame bitcoin_df
bitcoin_df = bitcoin_df.withColumnRenamed("Data", "Date")

# Unione DataFrame utilizzando la colonna "Date"
joined_df = bitcoin_df.join(filtered_df, bitcoin_df["Date"] == filtered_df["Date"], "inner")

# Eliminiamo una delle colonne "Date" dal DataFrame unito
joined_df = joined_df.drop(filtered_df["Date"])



In [0]:
from pyspark.sql.functions import year, month, col, when

# Filtrare i dati per gennaio
january_df = joined_df.filter((year(col("Date")) == 2017) & (month(col("Date")) == 1))

# Creazione colonne per i conteggi dei sentimenti
january_df = january_df.withColumn("Positive_Count", when(january_df["sentiment"] == "Positive", 1).otherwise(0))
january_df = january_df.withColumn("Negative_Count", when(january_df["sentiment"] == "Negative", 1).otherwise(0))
january_df = january_df.withColumn("Neutral_Count", when(january_df["sentiment"] == "Neutral", 1).otherwise(0))

# Raggruppare per giorno e calcolare la somma dei conteggi dei sentimenti
sentiment_counts = january_df.groupBy("Date").sum("Positive_Count", "Negative_Count", "Neutral_Count")

# Calcolo media dei conteggi per ogni sentimento
sentiment_averages = sentiment_counts.withColumn("Avg_Positive", col("sum(Positive_Count)") / 31)
sentiment_averages = sentiment_averages.withColumn("Avg_Negative", col("sum(Negative_Count)") / 31)
sentiment_averages = sentiment_averages.withColumn("Avg_Neutral", col("sum(Neutral_Count)") / 31)

# Mostrare il DataFrame risultante
sentiment_averages.show(4)

In [0]:
import matplotlib.pyplot as plt

january_sentiment_averages = sentiment_averages.filter(sentiment_averages["Date"].startswith("2017-01")) \
                                                .orderBy("Date")
dates = [str(row["Date"]).split("-")[-1] for row in january_sentiment_averages.collect()]
positive_avg = [row["Avg_Positive"] for row in january_sentiment_averages.collect()]
negative_avg = [row["Avg_Negative"] for row in january_sentiment_averages.collect()]
neutral_avg = [row["Avg_Neutral"] for row in january_sentiment_averages.collect()]

january_bitcoin_df = bitcoin_df.filter(bitcoin_df["Date"].startswith("2017-01")) \
                               .orderBy("Date")
bitcoin_dates = [str(row["Date"]).split("-")[-1] for row in january_bitcoin_df.select("Date").collect()]
bitcoin_variation = [float(row["Variazione"].replace(",", ".")) for row in january_bitcoin_df.select("Variazione").collect()]

plt.figure(figsize=(10, 6))
plt.plot(dates, positive_avg, label='Positive Sentiment', marker='o')
plt.plot(dates, negative_avg, label='Negative Sentiment', marker='o')
plt.plot(dates, neutral_avg, label='Neutral Sentiment', marker='o')
plt.plot(bitcoin_dates, bitcoin_variation, label='Bitcoin Variation', marker='o')
plt.xlabel('Day of January 2017')
plt.ylabel('Average Sentiment / Bitcoin Variation')
plt.title('Average Sentiment and Bitcoin Variation in January 2017')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
