In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=bd50fa55fabfce659e83fe67bc75fb787ccc65f2af3527ac56b8c0cbf0505c14
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "replace with your credentials"


In [None]:
from google.cloud import bigquery

# Create a BigQuery client
client = bigquery.Client()


In [None]:
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage
# Set the GCS bucket and CSV file path
bucket_name = 'reddit_data_big'
file_path = 'the-reddit-climate-change-dataset-comments.csv'

# Create a GCS client
storage_client = storage.Client()

# Retrieve the CSV file from GCS
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)

# Download the CSV file to a temporary local file
temp_file = '/tmp/temp_file.csv'
blob.download_to_filename(temp_file)

# Read the CSV file into a Pandas DataFrame
df = pd.read_csv(temp_file)
# Set up BigQuery client and dataset reference

In [None]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.sql.types import *

In [None]:
from pyspark.sql import SparkSession   
from pyspark import SparkContext, SparkConf
cf = SparkConf()
cf.set("spark.submit.deployMode", "client")

# Create a SparkContext
sc = SparkContext.getOrCreate(cf)


In [None]:
spark = SparkSession.builder.appName('senti').getOrCreate()

In [None]:
customSchema = StructType([
    StructField("body", StringType()), 
    StructField("sentiment", FloatType())])

In [None]:
df1 = spark.read.format("csv").option("header", "true").schema(customSchema).load(temp_file)


In [None]:
data = df1.na.drop(how='any')

In [None]:
from pyspark.sql.functions import when

data_with_sentiment = data.withColumn("sentiment_label", 
                                      when(data.sentiment > 0, 0).otherwise(
                                      when(data.sentiment < 0, 1).otherwise(2)))


In [None]:
data_final = data_with_sentiment["body","sentiment_label"]

##*Tokenizing and Cleaning the Data*

The RegexTokenizer breaks down the string into an array of tokens. This is followed by removing sthe topwords (words without any meaning) using the StopWordsRemover from the spark.ml.feature library. CountVectorizer takes the output from the StopWordsRemover as input and generates a vocabulary of the most frequent words that occur in the text documents. It then maps each document to a vector of word counts.

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="body", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=30000, minDF=5)

##*Defining the Pipeline*
This block of code creates a data processing pipeline using the previously defined regexTokenizer, stopwordsRemover, countVectors, and label_stringIdx stages.

The StringIndexer stage is used to encode the target variable sentiment_label as a numeric label in a new column called label.

The pipeline is then fit to the training data data_final using the fit method, and the resulting pipeline model is used to transform the data using the transform method, which applies the processing stages to the data and creates a new column features containing the bag-of-words representation of the text data.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "sentiment_label", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data_final)
dataset = pipelineFit.transform(data_final)

In [None]:
pipelineFit.save("pipeline")

##*Splitting of the historical Dataset into Train and Test data*

In [None]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 765907
Test Dataset Count: 327969


##*Defining the model for Predicition of Sentiments*
This code block is fitting a logistic regression model to the training data and making predictions on the test data using the trained model.

First, a LogisticRegression object is created with some specified hyperparameters (maxIter, regParam, and elasticNetParam).

Next, the fit() method of the LogisticRegression object is called with the training data as input, which trains the logistic regression model on the training data. The resulting trained model is stored in the lrModel variable.

Finally, the trained model is used to make predictions on the test data using the transform() method. The resulting predictions are stored in the predictions variable.

In [None]:
lr = LogisticRegression(maxIter=100, regParam=0.5, elasticNetParam=0)

#prepare data 

lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

In [None]:
lrModel.save("model")

# Make predictions using the trained model
predictions = lrModel.transform(testData)

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Create a SparkConf object and set the desired configuration properties
conf = SparkConf()
conf.set("spark.submit.deployMode", "client")

# Set the driver memory to 16 GB
conf.set("spark.driver.memory", "16g")

# Create a SparkSession and SparkContext
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext


In [None]:
df = pd.read_csv(temp_file)

In [None]:
df.rename(columns={'subreddit.nsfw': 'snsfw'}, inplace=True)
df.rename(columns={'subreddit.id': 'sid'}, inplace=True)
df.rename(columns={'subreddit.name': 'sname'}, inplace=True)


In [None]:
schema = StructType([
    StructField("type", StringType(), nullable=True),
    StructField("id", StringType(), nullable=True),
    StructField("sid", StringType(), nullable=True),
    StructField("sname", StringType(), nullable=True),
    StructField("snsfw", StringType(), nullable=True),
    StructField("created_utc", IntegerType(), nullable=True),
    StructField("permalink", StringType(), nullable=True),
    StructField("body", StringType(), nullable=True),
    StructField("sentiment", FloatType(), nullable=True),
    StructField("score", IntegerType(), nullable=True)
])
# Convert Pandas DataFrame to Spark DataFrame in chunks
chunk_size = 10000
num_chunks = len(df) // chunk_size + 1

def process_chunk(chunk):
    # Convert Pandas DataFrame chunk to Spark DataFrame
    spark_df_chunk = spark.createDataFrame(chunk, schema=schema)
    new_df = spark_df_chunk.select("body")
    # Apply the same preprocessing steps to the chunk
    chunk_transformed = pipelineFit.transform(new_df)
    
    # Make predictions using the logistic regression model
    predictions = lrModel.transform(chunk_transformed)
    
    # Select the 'body' and 'prediction' columns from the predictions
    body_predictions = predictions.select("body", "prediction")
    
    # Convert the Spark DataFrame back to Pandas DataFrame
    body_predictions_pandas = body_predictions.toPandas()
    
    return body_predictions_pandas

# Process each chunk and store the results
results = []
for i in range(num_chunks):
    start = i * chunk_size
    end = start + chunk_size
    chunk = df.iloc[start:end]
    chunk_predictions = process_chunk(chunk)
    results.append(chunk_predictions)

# Concatenate the results
body_predictions_pandas = pd.concat(results)


In [None]:
merged_df = df.merge(body_predictions_pandas, left_index=True, right_index=True, how="left")

merged_df = merged_df.drop('body_y', axis=1)
# Remove duplicate values from 'body_x' column
merged_df['body_x'] = merged_df['body_x'].drop_duplicates()

# Rename 'body_x' column to 'body'
merged_df = merged_df.rename(columns={'body_x': 'body'})

merged_df = merged_df.drop_duplicates(subset='body', keep='first')

In [None]:
merged_df

In [None]:
project_id = 'replace with your project id'
dataset_name = 'reddit'
table_name = 'comment'

# Create a BigQuery client
bigquery_client = bigquery.Client(project=project_id)

# Create the dataset reference
dataset_ref = bigquery_client.dataset(dataset_name)

# Create the dataset if it doesn't exist
if not bigquery_client.get_dataset(dataset_ref):
    bigquery_client.create_dataset(dataset_ref)

# Define the table schema
schema = [

    bigquery.SchemaField("type", "STRING"),
    bigquery.SchemaField("id", "STRING"),
    bigquery.SchemaField("sid", "STRING"),
    bigquery.SchemaField("sname", "STRING"),
    bigquery.SchemaField("snsfw", "BOOL"),
    bigquery.SchemaField("created_utc", "INTEGER"),
    bigquery.SchemaField("permalink", "STRING"),
    bigquery.SchemaField("body", "STRING"),
    bigquery.SchemaField("sentiment", "FLOAT"),
    bigquery.SchemaField("score", "INTEGER"),
    bigquery.SchemaField("prediction", "FLOAT"),
]

# Create the table reference
table_ref = dataset_ref.table(table_name)

# Define the table configuration
table = bigquery.Table(table_ref, schema=schema)

# Create the table
table = bigquery_client.create_table(table)

print(f'Table created: {project_id}.{dataset_name}.{table_name}')




Table created: famous-athlete-386604.reddit.comment


In [None]:
# Write the DataFrame to the BigQuery table
job_config = bigquery.LoadJobConfig()
job = bigquery_client.load_table_from_dataframe(merged_df, table_ref, job_config=job_config)
job.result()  # Wait for the job to complete

print(f'Data uploaded to BigQuery table: {project_id}.{dataset_name}.{table_name}')