In [1]:
from cassandra.cluster import Cluster
import csv

In [3]:
from pyspark.sql import SparkSession

# import csv hadoop spark

In [4]:

spark = SparkSession.builder.appName("ca2_nov").getOrCreate()

In [5]:
folder = "hdfs://localhost:9000/user1/ProjectTweets.csv"

In [6]:
df = spark.read.csv(folder, header=False, inferSchema=True)

                                                                                

In [7]:
df = df.toDF("id", "indice", "date", "query", "user", "tweet")

In [8]:
df.head()

Row(id=0, indice=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', query='NO_QUERY', user='_TheSpecialOne_', tweet="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D")

In [9]:
df.printSchema()


root
 |-- id: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)



In [10]:
df = df.dropDuplicates(["tweet"])



In [11]:
# using legacy for date parsing
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")



In [12]:
from pyspark.sql.functions import to_date

In [13]:
# trasform date column from string to date
df_con_data = df.withColumn("date", to_date(df["date"], "E MMM dd HH:mm:ss z yyyy"))


In [21]:
df_con_data.show(1)

[Stage 5:>                                                          (0 + 1) / 1]

+-------+----------+----------+--------+--------+--------------------+
|     id|    indice|      date|   query|    user|               tweet|
+-------+----------+----------+--------+--------+--------------------+
|1429354|2059729483|2009-06-07|NO_QUERY|mtiishaw|       Hardest wo...|
+-------+----------+----------+--------+--------+--------------------+
only showing top 1 row



                                                                                

In [22]:
# check on null values
df_con_data.describe().show()




+-------+-----------------+--------------------+--------+--------------------+--------------------+
|summary|               id|              indice|   query|                user|               tweet|
+-------+-----------------+--------------------+--------+--------------------+--------------------+
|  count|          1581466|             1581466| 1581466|             1581466|             1581466|
|   mean|799948.3493486423|1.9985071970856838E9|    null| 4.418920826240876E9|                null|
| stddev|462023.2915345735|1.9365969167076474E8|    null|5.218769198801353...|                null|
|    min|                0|          1467810369|NO_QUERY|        000catnap000|                 ...|
|    max|          1599999|          2329205794|NO_QUERY|          zzzzeus111|ï¿½ï¿½ï¿½ï¿½ï¿½ß§...|
+-------+-----------------+--------------------+--------+--------------------+--------------------+



                                                                                

In [14]:
df_con_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date: date (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)



# import in mysql

In [17]:
import mysql.connector


In [18]:
# info for connection
hostname = "127.0.0.1"  
username = "root"  
password = "password"  
database_name = "ca2"  

# connection to database
conn = mysql.connector.connect(
    host=hostname,
    user=username,
    password=password,
    database=database_name
)


In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("ca2_nov") \
    .config("spark.jars", "hdfs:///mysql-connector-j-8.2.0.jar") \
    .getOrCreate()


In [20]:

url = "jdbc:mysql://127.0.0.1:3306/ca2"
properties = {
    "user": "root",
    "password": "password",
    "driver": "com.mysql.cj.jdbc.Driver"
}





In [21]:
# write df spark in mysql
df_con_data.write.jdbc(url, "tweet", mode="append", properties=properties)

                                                                                

In [23]:
query = "SELECT * FROM tweet LIMIT 10"



In [25]:
query

'SELECT * FROM tweet LIMIT 10'

# import in cassandra

In [15]:
# node address
indirizzo_del_nodo_di_contatto = '127.0.0.1'

# cluster object
cluster = Cluster([indirizzo_del_nodo_di_contatto])

In [16]:
# cassandra session
session = cluster.connect()

# keyspace
session.execute("USE my_ca3")

<cassandra.cluster.ResultSet at 0x7fa2503d2470>

In [19]:
tweet_ca2 = """


CREATE TABLE IF NOT EXISTS my_ca3.tweet_ca2 (
    id INT PRIMARY KEY,
    indice BIGINT,
    date DATE,
    query TEXT,
    user TEXT,
    tweet TEXT
)



"""

session.execute(tweet_ca2)

<cassandra.cluster.ResultSet at 0x7fac188da770>

In [24]:
table_name = "tweet_ca2"

df_con_data.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table=table_name, keyspace="my_ca3") \
    .option("confirm.truncate", "true") \
    .mode("overwrite") \
    .save()

                                                                                

In [26]:
query = "SELECT COUNT(*) FROM tweet_ca2"
result = session.execute(query)


In [27]:
count = result.one()[0]
print(f"Total number of rows is : {count}")

Total number of rows is : 1581466


In [7]:
query = "SELECT * FROM tweet_ca2"
result = session.execute(query)

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame


# cleaning data for sentiment analysis 

In [17]:
from pyspark.sql.functions import udf

In [18]:
from pyspark.sql.types import FloatType

In [19]:
from nltk.tokenize import word_tokenize
import string



In [20]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /home/hduser/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [21]:
from pyspark.sql.types import StringType

In [22]:
def clean_data(text):
    tokens = word_tokenize(text)
    tokens = [word for word in tokens if word.isalpha()]
    clean_text = ' '.join(tokens)
    return clean_text




In [23]:
clean_data_utf = udf(clean_data, StringType())

In [24]:
df_cleaned = df_con_data.withColumn("tweet_cleaned", clean_data_utf("tweet"))


In [25]:
df_cleaned.show(1)

[Stage 5:>                                                          (0 + 1) / 1]

+-------+----------+----------+--------+--------+--------------------+--------------------+
|     id|    indice|      date|   query|    user|               tweet|       tweet_cleaned|
+-------+----------+----------+--------+--------+--------------------+--------------------+
|1429354|2059729483|2009-06-07|NO_QUERY|mtiishaw|       Hardest wo...|Hardest working c...|
+-------+----------+----------+--------+--------+--------------------+--------------------+
only showing top 1 row



                                                                                

# sentiment analysis with Vader

In [26]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [27]:
def sentiment_function(text):
    analyzer = SentimentIntensityAnalyzer()
    sent = analyzer.polarity_scores(text)
    return sent['compound']  #  'polarity_scores'function gives a value stored in column named 'compound'


In [28]:


# record utf
sentiment_udf = udf(sentiment_function, FloatType())

# Applica l'UDF per calcolare il punteggio "compound" e creare una nuova colonna "sentiment"
df_vader = df_cleaned.withColumn("sentiment", sentiment_udf(df_cleaned["tweet_cleaned"]))


In [29]:
df_vader.printSchema()

root
 |-- id: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date: date (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tweet_cleaned: string (nullable = true)
 |-- sentiment: float (nullable = true)



In [36]:
df_vader.show(1)

2023-11-10 17:49:00,336 WARN python.PythonUDFRunner: Detected deadlock while completing task 0.0 in stage 17 (TID 18): Attempting to kill Python Worker
                                                                                

+-------+----------+----------+--------+--------+--------------------+--------------------+---------+
|     id|    indice|      date|   query|    user|               tweet|       tweet_cleaned|sentiment|
+-------+----------+----------+--------+--------+--------------------+--------------------+---------+
|1429354|2059729483|2009-06-07|NO_QUERY|mtiishaw|       Hardest wo...|Hardest working c...|   0.2732|
+-------+----------+----------+--------+--------+--------------------+--------------------+---------+
only showing top 1 row



In [30]:
from pyspark.sql.functions import when, col


In [31]:
# new column 'label'
df_vader = df_vader.withColumn("label_vader", 
    when(col("sentiment") > 0.2, 1)
    .when(col("sentiment") < -0.2, -1)
    .otherwise(0)
)




In [32]:
df_vader.printSchema()

root
 |-- id: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date: date (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tweet_cleaned: string (nullable = true)
 |-- sentiment: float (nullable = true)
 |-- label_vader: integer (nullable = false)



In [40]:
label_counts_vader = df_vader.groupBy("label_vader").count()

# sentiment analysis with TextBlob

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType
from textblob import TextBlob


In [34]:
def analyze_sentiment(text):
    analysis = TextBlob(text)
    sentiment_score = analysis.sentiment.polarity
    return float(sentiment_score)

sentiment_udf = udf(analyze_sentiment, FloatType())



In [35]:
df_blob = df_cleaned.withColumn("sentiment", sentiment_udf(df_cleaned["tweet_cleaned"]))


In [36]:
df_blob.printSchema()

root
 |-- id: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date: date (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tweet_cleaned: string (nullable = true)
 |-- sentiment: float (nullable = true)



In [37]:
from pyspark.sql.functions import when, col

df_blob = df_blob.withColumn("label_blob", 
    when(col("sentiment") > 0.2, 1)
    .when(col("sentiment") < -0.2, -1)
    .otherwise(0)
)


In [38]:
df_blob = df_blob.withColumnRenamed("sentiment", "sentiment_blob")

In [39]:
df_blob = df_blob.withColumnRenamed("date", "date_blob")

In [40]:
df_blob = df_blob.withColumnRenamed("id", "id_blob")

In [41]:
df_vader = df_vader.withColumnRenamed("sentiment", "sentiment_vader")

In [42]:

merged_df = df_blob.join(df_vader, 'tweet_cleaned', 'inner')



In [43]:
merged_df.printSchema()

root
 |-- tweet_cleaned: string (nullable = true)
 |-- id_blob: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date_blob: date (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- sentiment_blob: float (nullable = true)
 |-- label_blob: integer (nullable = false)
 |-- id: integer (nullable = true)
 |-- indice: long (nullable = true)
 |-- date: date (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- sentiment_vader: float (nullable = true)
 |-- label_vader: integer (nullable = false)



In [44]:
# Seleziona solo le colonne desiderate utilizzando alias
result_df = merged_df.select(
   
    "sentiment_blob",
    "date",
    "label_blob",
    "label_vader",
    "sentiment_vader",
    "id"
)



In [45]:
result_df.printSchema()

root
 |-- sentiment_blob: float (nullable = true)
 |-- date: date (nullable = true)
 |-- label_blob: integer (nullable = false)
 |-- label_vader: integer (nullable = false)
 |-- sentiment_vader: float (nullable = true)
 |-- id: integer (nullable = true)



In [46]:
sentiment = """


CREATE TABLE IF NOT EXISTS my_ca3.sentiment (
    id INT PRIMARY KEY,
    sentiment_blob FLOAT,
    date DATE,
    label_blob INT,
    label_vader INT,
    sentiment_vader FLOAT
)



"""

session.execute(sentiment)

<cassandra.cluster.ResultSet at 0x7fa222037b80>

In [None]:
result_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="sentiment", keyspace="my_ca3") \
    .option("confirm.truncate", "true") \
    .mode("overwrite") \
    .save()

[Stage 11:>                                                         (0 + 2) / 3]