In [1]:
import pandas as pd
import re

In [2]:
data_tweet = pd.read_csv('data_tweet_music.csv')
data_tweet.head()

Unnamed: 0,User,Tweet
0,pjs814,RT @Melissa04899261: @Jiminlove3000 @BTS_twt B...
1,pjmmycatdog,RT @Marvicky24: BE GOOD BE LIKE JIMIN\nWITH YO...
2,chejiminie5813,RT @PJM_data: Jimin's Twitter Trends | Worldwi...
3,mlcTcdAX6jOfKC0,RT @PJMsMINI13: @charming1023 @BTS_twt My favo...
4,gortizpolly,RT @DeansMyHero: Jensen taking over the trendi...


In [7]:
# data cleaning
def punctuation(txt):
    result = re.sub(r'http\S+','',txt)
    result = re.sub(r'@\w+','',txt)
    result = re.sub(r'#','',txt)
    result = re.sub(r'RT','',txt)
    result = re.sub(r':','',txt)
    return result

In [30]:
data_tweet['Tweet'] = data_tweet['Tweet'].apply(punctuation)

In [32]:
data_tweet['Tweet']

0        melissa04899261 jiminlove3000 bs_twt be good ...
1        marvicky24 be good be like jimin wih you  jim...
2        pjm_data jimins witter rends  worldwide    be...
3        pjmsmini13 charming1023 bs_twt my favorite bi...
4        deansmyhero jensen taking over the trending t...
                              ...                        
9995     domonchain guess whos trending again on coing...
9996    im listening to bs_butter by bs for my rending...
9997    im listening to the hot trending song withyou ...
9998    exile  kiss you all over on httpstcowiiin7wgod...
9999     solecitopjm seriev46 bs_twt blackpink be good...
Name: Tweet, Length: 10000, dtype: object

In [52]:
def punctuation(txt):
    result = re.sub(r'[^\w\s]','',txt)
    result = re.sub(r'http\S+','',txt)
    result = re.sub(r'@\w+','',txt)
    result = re.sub(r'#','',txt)
    result = re.sub(r'RT','',txt)
    result = re.sub(r':','',txt)
    return result

In [43]:
def preprocessing(lines):
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

In [70]:
def preprocessing(lines):
    words = lines.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('Tweet', F.regexp_replace('Tweet', r'http\S+', ''))
    words = words.withColumn('Tweet', F.regexp_replace('Tweet', '@\w+', ''))
    words = words.withColumn('Tweet', F.regexp_replace('Tweet', '#', ''))
    words = words.withColumn('Tweet', F.regexp_replace('Tweet', 'RT', ''))
    words = words.withColumn('Tweet', F.regexp_replace('Tweet', ':', ''))
    return words

In [74]:
# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("Tweet"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("Tweet"))
    return words

In [36]:
spark = SparkSession.builder \
    .appName("Sentiment_Analysis") \
    .getOrCreate()

In [59]:
df = spark.read.load("data_tweet_music.csv",
                     format="csv", inferSchema="true", header="true")

In [61]:
df.show(5,True)
df.printSchema()

+--------------------+--------------------+
|                User|               Tweet|
+--------------------+--------------------+
|              pjs814|RT @Melissa048992...|
|I’m listening to ...|                null|
|         pjmmycatdog|RT @Marvicky24: B...|
|   WITH YOU 💙 JIMIN|                null|
|I’m listening to ...|                null|
+--------------------+--------------------+
only showing top 5 rows

root
 |-- User: string (nullable = true)
 |-- Tweet: string (nullable = true)



In [65]:
words = df['Tweet']
words.show(5,True)

TypeError: 'Column' object is not callable

In [71]:
words = preprocessing(df)

In [75]:
words = text_classification(words)
words = words.repartition(1)


In [76]:
query = words.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parc")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds').start()
query.awaitTermination()

AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame

In [79]:
words.show()

+---------------+--------------------+--------------------+-------------------+
|           User|               Tweet|            polarity|       subjectivity|
+---------------+--------------------+--------------------+-------------------+
|         pjs814|    BE GOOD BE LI...|                 0.7| 0.6000000000000001|
|    pjmmycatdog|  BE GOOD BE LIKE...|                 0.7| 0.6000000000000001|
| chejiminie5813|  Jimin's Twitter...|                 0.0|                0.0|
|mlcTcdAX6jOfKC0|    My favorite B...|                0.25|               0.75|
|    gortizpolly|  Jensen taking o...| 0.07499999999999998| 0.7250000000000001|
|      MommyLouV|  ChickenNoodleSo...|                 0.0|                0.0|
|      YxinBts13|    Forever in lo...|               0.375| 0.7250000000000001|
|Korazon24698179|  We are trying/e...|                0.25| 0.5916666666666667|
|      951013sea|     I’m listenin...|                0.25| 0.8500000000000001|
|0s6nV4UuDikw11A|    I’m listening...|  