Data Collection

In [None]:
!pip install tweepy pyspark nltk airflow


In [5]:
#Configure Twitter API
import tweepy
import json

# Replace these with your own Twitter API keys and tokens
consumer_key = 'your_consumer_key'
consumer_secret = 'your_consumer_secret'
access_token = 'your_access_token'
access_token_secret = 'your_access_token_secret'

# Set up tweepy authentication
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)

# Collect tweets with a specific hashtag
hashtag = "#competition"
tweets = tweepy.Cursor(api.search_tweets, q=hashtag, lang="en", tweet_mode='extended').items(1000)

print(tweets)

# Extract data
data = []
for tweet in tweets:
    data.append(tweet.full_text)

# Save data to a file
with open('twitter_data.txt', 'w', encoding='utf-8') as f:
    for item in data:
        f.write("%s\n" % item)


Unauthorized: 401 Unauthorized
89 - Invalid or expired token.

Data Preprocessing with Spark

In [7]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName('TwitterSentimentAnalysis').getOrCreate()

# Read data from file
data = spark.read.text('twitter_data.txt')
data.show(5)


JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [12]:
#Clean and Preprocess Text Data
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Clean text
def clean_text(text):
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    text = re.sub(r'\@\w+|\#', '', text)
    text = text.encode('ascii', 'ignore').decode('ascii')  # Remove emojis
    text = re.sub(r'\s+', ' ', text).strip()
    return text

clean_text_udf = udf(lambda x: clean_text(x), StringType())
data_cleaned = data.withColumn('cleaned_text', clean_text_udf(data['value']))

# Tokenize text
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='words')
data_tokenized = tokenizer.transform(data_cleaned)

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
data_no_stopwords = remover.transform(data_tokenized)

# Lemmatize text
lemmatizer = WordNetLemmatizer()

def lemmatize_words(words):
    return [lemmatizer.lemmatize(word) for word in words]

lemmatize_udf = udf(lambda x: lemmatize_words(x), ArrayType(StringType()))
data_lemmatized = data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(data_no_stopwords['filtered_words']))

# Convert to TF-IDF features
hashingTF = HashingTF(inputCol='lemmatized_words', outputCol='raw_features', numFeatures=20)
featurized_data = hashingTF.transform(data_lemmatized)

idf = IDF(inputCol='raw_features', outputCol='features')
idfModel = idf.fit(featurized_data)
rescaled_data = idfModel.transform(featurized_data)

rescaled_data.select('features').show()



In [23]:
#Clean and Preprocess Text Data
import re
import nltk
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, StringType, IntegerType, DoubleType
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Initialize Spark session
spark = SparkSession.builder.appName('TwitterSentimentAnalysis').getOrCreate()

schema = StructType([
    StructField("label", IntegerType()),
    StructField("ids", IntegerType()),
    StructField("date", StringType()),
    StructField("flag", StringType()),
    StructField("user", StringType()),
    StructField("text", StringType()),
])

# Load Sentiment140 dataset
sentiment_data = spark.read.csv('sentiment140.csv', header=False, schema=schema)

#column_length = sentiment_data.select('label').count()
#len04 = sentiment_data.select("label").filter(sentiment_data['label'] == 4).count()
#print(column_length, len04)
# first 800000 points are labelled with '0' and the remaining 800000 are labelled '4', instances of class '2' are not present in the dataset

# Clean text
def clean_text(text):
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    text = re.sub(r'\@\w+|\#', '', text)
    text = text.encode('ascii', 'ignore').decode('ascii')  # Remove emojis
    text = re.sub(r'\s+', ' ', text).strip()
    return text

clean_text_udf = udf(lambda x: clean_text(x), StringType())
sentiment_data = sentiment_data.withColumnRenamed('text', 'value')
sentiment_data_cleaned = sentiment_data.withColumn('cleaned_text', clean_text_udf(sentiment_data['value']))

# Tokenize text
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='words')
sentiment_data_tokenized = tokenizer.transform(sentiment_data_cleaned)

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
sentiment_data_no_stopwords = remover.transform(sentiment_data_tokenized)

# Lemmatize text
lemmatizer = WordNetLemmatizer()

def lemmatize_words(words):
    return [lemmatizer.lemmatize(word) for word in words]

nltk.download('wordnet')
lemmatize_udf = udf(lambda x: lemmatize_words(x), ArrayType(StringType()))
sentiment_data_lemmatized = sentiment_data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(sentiment_data_no_stopwords['filtered_words']))

# Convert to TF-IDF features
hashingTF = HashingTF(inputCol='lemmatized_words', outputCol='raw_features', numFeatures=20)
featurized_sentiment_data = hashingTF.transform(sentiment_data_lemmatized)

idf = IDF(inputCol='raw_features', outputCol='features')
idfModel = idf.fit(featurized_sentiment_data)
rescaled_sentiment_data = idfModel.transform(featurized_sentiment_data)

rescaled_sentiment_data.select('features').show()


#featurized_sentiment_data = hashingTF.transform(sentiment_data_lemmatized)
#rescaled_sentiment_data = idfModel.transform(featurized_sentiment_data)

'''
data_cleaned = data.withColumn('cleaned_text', clean_text_udf(data['value']))

# Tokenize text
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='words')
data_tokenized = tokenizer.transform(data_cleaned)

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
data_no_stopwords = remover.transform(data_tokenized)

# Lemmatize text
lemmatizer = WordNetLemmatizer()

def lemmatize_words(words):
    return [lemmatizer.lemmatize(word) for word in words]

lemmatize_udf = udf(lambda x: lemmatize_words(x), ArrayType(StringType()))
data_lemmatized = data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(data_no_stopwords['filtered_words']))

# Convert to TF-IDF features
hashingTF = HashingTF(inputCol='lemmatized_words', outputCol='raw_features', numFeatures=20)
featurized_data = hashingTF.transform(data_lemmatized)

idf = IDF(inputCol='raw_features', outputCol='features')
idfModel = idf.fit(featurized_data)
rescaled_data = idfModel.transform(featurized_data)

rescaled_data.select('features').show()
'''


[nltk_data] Downloading package wordnet to /home/saketh/nltk_data...
[Stage 38:>                                                         (0 + 1) / 1]

+--------------------+
|            features|
+--------------------+
|(20,[1,5,10,11,12...|
|(20,[1,4,6,7,10,1...|
|(20,[0,1,5,7,8,11...|
|(20,[1,7,10,11,15...|
|(20,[4,6,12,14,15...|
|(20,[7,18],[1.241...|
|(20,[17,18],[1.06...|
|(20,[4,7,8,9,11,1...|
|(20,[14],[1.36014...|
|(20,[7],[3.724169...|
|(20,[4,7,8,11],[1...|
|(20,[4,6],[1.2330...|
|(20,[5,8,9,11,13,...|
|(20,[1,5,7,12,18,...|
|(20,[4,6,10,12,17...|
|(20,[2,8,10,11,14...|
|(20,[2,3,4,6,8,9,...|
|(20,[7,8],[1.2413...|
|(20,[0,3,9,11,12,...|
|(20,[2,4,8,9,11,1...|
+--------------------+
only showing top 20 rows



                                                                                

"\ndata_cleaned = data.withColumn('cleaned_text', clean_text_udf(data['value']))\n\n# Tokenize text\ntokenizer = Tokenizer(inputCol='cleaned_text', outputCol='words')\ndata_tokenized = tokenizer.transform(data_cleaned)\n\n# Remove stop words\nremover = StopWordsRemover(inputCol='words', outputCol='filtered_words')\ndata_no_stopwords = remover.transform(data_tokenized)\n\n# Lemmatize text\nlemmatizer = WordNetLemmatizer()\n\ndef lemmatize_words(words):\n    return [lemmatizer.lemmatize(word) for word in words]\n\nlemmatize_udf = udf(lambda x: lemmatize_words(x), ArrayType(StringType()))\ndata_lemmatized = data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(data_no_stopwords['filtered_words']))\n\n# Convert to TF-IDF features\nhashingTF = HashingTF(inputCol='lemmatized_words', outputCol='raw_features', numFeatures=20)\nfeaturized_data = hashingTF.transform(data_lemmatized)\n\nidf = IDF(inputCol='raw_features', outputCol='features')\nidfModel = idf.fit(featurized_data)\nrescale

In [25]:
# Specify the path where you want to save the IDF model
idf_model_path = "idf_model"

# Save the IDF model
idfModel.save(idf_model_path)

                                                                                

In [28]:
output_path = "rescaled_sentiment_data.csv"
rescaled_sentiment_data.select('features').write.option("header", "true").csv(output_path)

AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support the column `features` of the type "STRUCT<type: TINYINT, size: INT, indices: ARRAY<INT>, values: ARRAY<DOUBLE>>".

Feature Engineering for Sentiment Analysis

In [None]:
'''
#Create Pipeline for Model Training
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Load Sentiment140 dataset
sentiment_data = spark.read.csv('path/to/sentiment140.csv', header=True, inferSchema=True)

# Preprocess Sentiment140 data (similar steps as above)
sentiment_data = sentiment_data.withColumnRenamed('text', 'value')
sentiment_data_cleaned = sentiment_data.withColumn('cleaned_text', clean_text_udf(sentiment_data['value']))
sentiment_data_tokenized = tokenizer.transform(sentiment_data_cleaned)
sentiment_data_no_stopwords = remover.transform(sentiment_data_tokenized)
sentiment_data_lemmatized = sentiment_data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(sentiment_data_no_stopwords['filtered_words']))
featurized_sentiment_data = hashingTF.transform(sentiment_data_lemmatized)
rescaled_sentiment_data = idfModel.transform(featurized_sentiment_data)

# Assuming the Sentiment140 data has a 'sentiment' column
labeled_data = rescaled_sentiment_data.withColumn('label', sentiment_data['sentiment'])

# Split the data
(training_data, test_data) = labeled_data.randomSplit([0.8, 0.2])

# Build the Logistic Regression Model
lr = LogisticRegression(featuresCol='features', labelCol='label', family='multinomial')

# Create a pipeline
pipeline = Pipeline(stages=[hashingTF, idf, lr])

# Train the model
model = pipeline.fit(training_data)

# Test the model
predictions = model.transform(test_data)
predictions.select('prediction', 'label', 'features').show()
'''

ML FLOW TRACKING

In [None]:
#Create Pipeline for Model Training
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

'''
# Load Sentiment140 dataset
sentiment_data = spark.read.csv('path/to/sentiment140.csv', header=True, inferSchema=True)

# Preprocess Sentiment140 data (similar steps as above)
sentiment_data = sentiment_data.withColumnRenamed('text', 'value')
sentiment_data_cleaned = sentiment_data.withColumn('cleaned_text', clean_text_udf(sentiment_data['value']))
sentiment_data_tokenized = tokenizer.transform(sentiment_data_cleaned)
sentiment_data_no_stopwords = remover.transform(sentiment_data_tokenized)
sentiment_data_lemmatized = sentiment_data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(sentiment_data_no_stopwords['filtered_words']))
featurized_sentiment_data = hashingTF.transform(sentiment_data_lemmatized)
rescaled_sentiment_data = idfModel.transform(featurized_sentiment_data)

# Assuming the Sentiment140 data has a 'sentiment' column
labeled_data = rescaled_sentiment_data.withColumn('label', sentiment_data['target'])
'''

# Split the data
(training_data, test_data) = labeled_data.randomSplit([0.8, 0.2])

# MLflow experiment tracking
mlflow.set_experiment("Sentiment Analysis")

with mlflow.start_run() as run:
    
    # Build the Logistic Regression Model
    lr = LogisticRegression(featuresCol='features', labelCol='label', family='multinomial')
    
    # Create a pipeline
    pipeline = Pipeline(stages=[hashingTF, idf, lr])
    
    # Log parameters
    mlflow.log_param("model_type", "LogisticRegression")
    mlflow.log_param("hashingTF_numFeatures", 100)
    
    # Train the model
    model = pipeline.fit(training_data)
    
    # Save the trained model for later use
    model_path = "sentiment_model"
    model.save(model_path)
    mlflow.spark.log_model(model, "model")
    
    # Load the trained model
    model = PipelineModel.load(model_path)
    
    # Test the model
    predictions = model.transform(test_data)
    predictions.select('prediction', 'label', 'features').show()
    
    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

In [None]:
# Define the FastAPI app
app = FastAPI()

# Data model for request
class TextData(BaseModel):
    texts: List[str]

# Define the endpoint for sentiment analysis
@app.post("/analyze-sentiment")
async def analyze_sentiment(data: TextData):
    texts = data.texts

    # Create a DataFrame from the input texts
    df = spark.createDataFrame([(text,) for text in texts], ["text"])

    # Process the DataFrame as per preprocessing pipeline
    df_cleaned = df.withColumn('cleaned_text', clean_text_udf(df['text']))
    df_tokenized = tokenizer.transform(df_cleaned)
    df_no_stopwords = remover.transform(df_tokenized)
    df_lemmatized = df_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(df_no_stopwords['filtered_words']))
    df_features = hashingTF.transform(df_lemmatized)
    df_rescaled = idfModel.transform(df_features)

    # Make predictions using developed model
    predictions = model.transform(df_rescaled)
    results = predictions.select('prediction').collect()

    # Map predictions to sentiment labels
    sentiment_mapping = {0: 'negative', 1: 'positive'}  #Labels can be adjusted accordingly
    sentiments = [sentiment_mapping[int(row.prediction)] for row in results]

    return {"sentiments": sentiments}

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

Pipeline Orchestration with Apache Airflow

In [None]:
# Initialize the database
airflow db init

# Create a user
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com

# Start the Airflow web server
airflow webserver --port 8080

# Start the Airflow scheduler
airflow scheduler


In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 1,
}

dag = DAG(
    'twitter_sentiment_analysis',
    default_args=default_args,
    description='A simple Twitter sentiment analysis DAG',
    schedule_interval='@daily',
)

def collect_data():
    import tweepy
    # Replace these with your own Twitter API keys and tokens
    consumer_key = 'your_consumer_key'
    consumer_secret = 'your_consumer_secret'
    access_token = 'your_access_token'
    access_token_secret = 'your_access_token_secret'

    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    api = tweepy.API(auth, wait_on_rate_limit=True)

    hashtag = "#examplehashtag"
    tweets = tweepy.Cursor(api.search_tweets, q=hashtag, lang="en", tweet_mode='extended').items(1000)

    data = []
    for tweet in tweets:
        data.append(tweet.full_text)

    with open('/path/to/twitter_data.txt', 'w', encoding='utf-8') as f:
        for item in data:
            f.write("%s\n" % item)

def preprocess_data():
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType, ArrayType
    from nltk.tokenize import word_tokenize
    from nltk.stem import WordNetLemmatizer
    from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
    import re

    spark = SparkSession.builder.appName('TwitterSentimentAnalysis').getOrCreate()
    data = spark.read.text('/path/to/twitter_data.txt')

    def clean_text(text):
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
        text = re.sub(r'\@\w+|\#', '', text)
        text = text.encode('ascii', 'ignore').decode('ascii')
        text = re.sub(r'\s+', ' ', text).strip()
        return text

    clean_text_udf = udf(lambda x: clean_text(x), StringType())
    data_cleaned = data.withColumn('cleaned_text', clean_text_udf(data['value']))

    tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='words')
    data_tokenized = tokenizer.transform(data_cleaned)

    remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
    data_no_stopwords = remover.transform(data_tokenized)

    lemmatizer = WordNetLemmatizer()

    def lemmatize_words(words):
        return [lemmatizer.lemmatize(word) for word in words]

    lemmatize_udf = udf(lambda x: lemmatize_words(x), ArrayType(StringType()))
    data_lemmatized = data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(data_no_stopwords['filtered_words']))

    hashingTF = HashingTF(inputCol='lemmatized_words', outputCol='raw_features', numFeatures=20)
    featurized_data = hashingTF.transform(data_lemmatized)

    idf = IDF(inputCol='raw_features', outputCol='features')
    idfModel = idf.fit(featurized_data)
    rescaled_data = idfModel.transform(featurized_data)

    rescaled_data.write.save('/path/to/preprocessed_data', format='parquet')

def train_model():
    from pyspark.sql import SparkSession
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml import Pipeline

    spark = SparkSession.builder.appName('TwitterSentimentAnalysis').getOrCreate()
    rescaled_data = spark.read.load('/path/to/preprocessed_data')

    sentiment_data = spark.read.csv('path/to/sentiment140.csv', header=True, inferSchema=True)
    sentiment_data = sentiment_data.withColumnRenamed('text', 'value')
    sentiment_data_cleaned = sentiment_data.withColumn('cleaned_text', clean_text_udf(sentiment_data['value']))
    sentiment_data_tokenized = tokenizer.transform(sentiment_data_cleaned)
    sentiment_data_no_stopwords = remover.transform(sentiment_data_tokenized)
    sentiment_data_lemmatized = sentiment_data_no_stopwords.withColumn('lemmatized_words', lemmatize_udf(sentiment_data_no_stopwords['filtered_words']))
    featurized_sentiment_data = hashingTF.transform(sentiment_data_lemmatized)
    rescaled_sentiment_data = idfModel.transform(featurized_sentiment_data)
    labeled_data = rescaled_sentiment_data.withColumn('label', sentiment_data['sentiment'])

    (training_data, test_data) = labeled_data.randomSplit([0.8, 0.2])

    lr = LogisticRegression(featuresCol='features', labelCol='label')
    pipeline = Pipeline(stages=[hashingTF, idf, lr])

    model = pipeline.fit(training_data)
    predictions = model.transform(test_data)
    predictions.select('prediction', 'label', 'features').show()

collect_data_task = PythonOperator(
    task_id='collect_data',
    python_callable=collect_data,
    dag=dag,
)

preprocess_data_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

train_model_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

collect_data_task >> preprocess_data_task >> train_model_task
