In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report

# Create Spark session
spark = SparkSession.builder.appName("HateSpeechDetection").getOrCreate()

# Load data
df = spark.read.csv("twitter_practical(4A).csv", header=True, inferSchema=True)
df = df.withColumnRenamed("tweet", "text").withColumnRenamed("label", "category")
df = df.dropna(subset=["text", "category"])

# Convert labels (0: Hate → 1, 1/2: Not Hate → 0)
label_udf = udf(lambda x: 1 if x == 0 else 0, IntegerType())
df = df.withColumn("label", label_udf(col("category")))

# Clean text
def clean_text(df):
    df = df.withColumn("text", lower(col("text")))
    df = df.withColumn("text", regexp_replace(col("text"), r"http\S+", ""))
    df = df.withColumn("text", regexp_replace(col("text"), r"@\w+", ""))
    df = df.withColumn("text", regexp_replace(col("text"), r"#\w+", ""))
    df = df.withColumn("text", regexp_replace(col("text"), r"[^a-zA-Z\s]", ""))
    return df

df = clean_text(df)

# Build pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=20)
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# Split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

# Evaluate
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Classification report
y_true = predictions.select("label").rdd.flatMap(lambda x: x).collect()
y_pred = predictions.select("prediction").rdd.flatMap(lambda x: x).collect()
print(classification_report(y_true, y_pred, target_names=["Not Hate Speech", "Hate Speech"]))

# -------------------------------
# Test custom tweets
# -------------------------------
sample_tweets = [
    ("I hate this so much! This is terrible!",),
    ("Wow, this is amazing! Great job!",),
    ("You are so dumb, I can't believe this!",),
    ("Wishing everyone a happy and peaceful day!",),
    ("This is absolutely disgusting and unacceptable!",)
]

sample_df = spark.createDataFrame(sample_tweets, ["text"])
sample_df = clean_text(sample_df)  # Clean same as training data

# Predict
results = model.transform(sample_df).select("text", "prediction")

# Show predictions
for row in results.collect():
    tweet = row["text"]
    prediction = "Hate Speech" if row["prediction"] == 1.0 else "Not Hate Speech"
    print(f"\nTweet: {tweet}\nPrediction: {prediction}")


Accuracy: 0.9166404529726329
                 precision    recall  f1-score   support

Not Hate Speech       0.38      0.50      0.43       406
    Hate Speech       0.97      0.94      0.96      5952

       accuracy                           0.92      6358
      macro avg       0.67      0.72      0.69      6358
   weighted avg       0.93      0.92      0.92      6358


Tweet: i hate this so much this is terrible
Prediction: Hate Speech

Tweet: wow this is amazing great job
Prediction: Hate Speech

Tweet: you are so dumb i cant believe this
Prediction: Not Hate Speech

Tweet: wishing everyone a happy and peaceful day
Prediction: Hate Speech

Tweet: this is absolutely disgusting and unacceptable
Prediction: Hate Speech


In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, when, rand
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Start Spark session
spark = SparkSession.builder.appName("FixedSentimentAnalysis").getOrCreate()
start_time = time.time()

# Load dataset
df = spark.read.csv("twitter.csv", header=True, inferSchema=True).select("tweet", "label")

# Clean tweets
df_clean = df.withColumn("tweet", lower(col("tweet")))
df_clean = df_clean.withColumn("tweet", regexp_replace("tweet", r"http\S+|www\S+", ""))
df_clean = df_clean.withColumn("tweet", regexp_replace("tweet", r"@\w+", ""))
df_clean = df_clean.withColumn("tweet", regexp_replace("tweet", r"#", ""))
df_clean = df_clean.withColumn("tweet", regexp_replace("tweet", r"[^\w\s]", ""))
df_clean = df_clean.withColumn("tweet", regexp_replace("tweet", r"\d+", ""))

# Handle class imbalance
positive = df_clean.filter(col("label") == 1)
negative = df_clean.filter(col("label") == 0)
neg_sample = negative.sample(False, positive.count() / negative.count(), seed=42)
df_train = neg_sample.union(positive).orderBy(rand())

# Test sentences
test_sentences = [
    ("I hate this product! It's the worst!",),
    ("This is amazing! I love it.",),
    ("Not bad, but could be better.",),
    ("Absolutely terrible experience.",),
    ("Had a fantastic time using this app.",),
]
df_test = spark.createDataFrame(test_sentences, ["tweet"])

# Clean test tweets
for pattern in [r"http\S+|www\S+", r"@\w+", r"#", r"[^\w\s]", r"\d+"]:
    df_test = df_test.withColumn("tweet", regexp_replace("tweet", pattern, ""))
df_test = df_test.withColumn("tweet", lower(col("tweet")))

# ML Pipeline
tokenizer = RegexTokenizer(inputCol="tweet", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", vocabSize=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")  # <-- use original label

pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, idf, lr])

# Train model
model = pipeline.fit(df_train)

# Predict
predictions = model.transform(df_test)

# Map prediction
predictions = predictions.withColumn(
    "sentiment",
    when(col("prediction") == 1.0, "Negative").otherwise("Positive")
)

print(f"\nExecution Time: {time.time() - start_time:.2f} seconds")
predictions.select("tweet", "prediction", "sentiment").show(truncate=False)

# Stop Spark
spark.stop()



Execution Time: 16.62 seconds
+-----------------------------------+----------+---------+
|tweet                              |prediction|sentiment|
+-----------------------------------+----------+---------+
|i hate this product its the worst  |1.0       |Negative |
|this is amazing i love it          |0.0       |Positive |
|not bad but could be better        |0.0       |Positive |
|absolutely terrible experience     |1.0       |Negative |
|had a fantastic time using this app|0.0       |Positive |
+-----------------------------------+----------+---------+



In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "WordCount")

# Load text file into RDD
text_rdd = sc.textFile("input.txt")

# Split the text into words, and map them to (word, 1)
words_rdd = text_rdd.flatMap(lambda line: line.split(" ")) \
                    .map(lambda word: (word, 1))

# Reduce by key (word) to count occurrences
word_count_rdd = words_rdd.reduceByKey(lambda x, y: x + y)

# Collect and print the results
print(word_count_rdd.collect())

# Stop SparkContext
sc.stop()

[('anjali', 2), ('anushka', 2), ('poonam', 2), ('tejaswini', 1), ('harshal', 1), ('supriya', 1), ('ankush', 1)]


In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MatrixMultiplication").getOrCreate()
sc = spark.sparkContext

# Example matrices
matrix_A = [
    (0, 0, 4), (0, 1, 6), (0, 2, 8),
    (1, 0, 5), (1, 1, 5), (1, 2, 4)
]

matrix_B = [
    (0, 0, 7), (0, 1, 8),
    (1, 0, 9), (1, 1, 10),
    (2, 0, 11), (2, 1, 12)
]

# Convert matrices into RDDs
rdd_A = sc.parallelize(matrix_A)  # (row, col, value)
rdd_B = sc.parallelize(matrix_B)  # (row, col, value)

# Map phase: Convert matrix entries into (key, value) pairs
mapped_A = rdd_A.map(lambda x: (x[1], (x[0], x[2])))  # Keyed by column of A
mapped_B = rdd_B.map(lambda x: (x[0], (x[1], x[2])))  # Keyed by row of B

# Join on common key (column index of A and row index of B)
joined = mapped_A.join(mapped_B)
# Result: (shared_index, ((row_A, val_A), (col_B, val_B)))

# Compute partial products
partial_products = joined.map(
    lambda x: ((x[1][0][0], x[1][1][0]), x[1][0][1] * x[1][1][1])
)

# Reduce phase: Sum partial products for each (row, col) position
result = partial_products.reduceByKey(lambda x, y: x + y)

# Collect and print results
output = result.collect()
for ((row, col), value) in sorted(output):
    print(f"({row}, {col}) -> {value}")

# Stop Spark session
spark.stop()

(0, 0) -> 170
(0, 1) -> 188
(1, 0) -> 124
(1, 1) -> 138


In [None]:
import os
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, dayofyear, max as spark_max, when, expr
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Define the base URLs for the data
base_url_1 = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{}/99495199999.csv"
base_url_2 = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{}/72429793812.csv"

# Define the range of years
years = range(2021, 2023)

# Base directory to save the downloaded files
base_output_dir = "./weather_data/"

# Download data for each year and station
for year in years:
    year_dir = os.path.join(base_output_dir, str(year))
    os.makedirs(year_dir, exist_ok=True)

    for base_url, station_id in [(base_url_1, "99495199999"), (base_url_2, "72429793812")]:
        url = base_url.format(year)
        response = requests.get(url)

        if response.status_code == 200:
            file_path = os.path.join(year_dir, f"{station_id}.csv")
            with open(file_path, "wb") as file:
                file.write(response.content)
            print(f"Downloaded: {file_path}")
        else:
            print(f"Failed to download {url}. Status code: {response.status_code}")

# Clean the data by filtering out invalid values
base_input_dir = "./weather_data/"
base_output_dir = "./cleaned_weather_data/"

invalid_values = {
    "MXSPD": 999.9,
    "MAX": 9999.9,
    "MIN": 9999.9,
}

for year in range(2021, 2023):
    year_dir = os.path.join(base_input_dir, str(year))

    if os.path.exists(year_dir):
        for station_id in ["99495199999", "72429793812"]:
            file_path = os.path.join(year_dir, f"{station_id}.csv")
            if os.path.exists(file_path):
                df = pd.read_csv(file_path)
                for column, invalid_value in invalid_values.items():
                    df = df[df[column] != invalid_value]

                output_year_dir = os.path.join(base_output_dir, str(year))
                os.makedirs(output_year_dir, exist_ok=True)
                cleaned_file_path = os.path.join(output_year_dir, f"{station_id}.csv")
                df.to_csv(cleaned_file_path, index=False)
                print(f"Cleaned data saved to: {cleaned_file_path}")
            else:
                print(f"File not found: {file_path}")
    else:
        print(f"Year directory not found: {year_dir}")

# Spark session setup
spark = SparkSession.builder.appName("Weather Analysis").getOrCreate()

# Question 2: Load the CSV files and display the count of each dataset.
for year in range(2021, 2023):
    for station_id in ["99495199999", "72429793812"]:
        file_path = os.path.join(base_output_dir, str(year), f"{station_id}.csv")
        if os.path.exists(file_path):
            df = spark.read.csv(file_path, header=True, inferSchema=True)
            print(f"Year: {year}, Station: {station_id}, Row count: {df.count()}")

# Question 6: Count missing values for 'GUST' in 2024
base_path = "./cleaned_weather_data/2024/"
station_codes = ['99495199999', '72429793812']
results = []

for station_code in station_codes:
    file_path = os.path.join(base_path, f"{station_code}.csv")
    if os.path.exists(file_path):
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        total_count = df.count()
        missing_count = df.filter(df.GUST == 999.9).count()

        missing_percentage = (missing_count / total_count) * 100 if total_count > 0 else 0.0
        results.append((station_code, missing_percentage))

for station_code, missing_percentage in results:
    print(f"Station Code: {station_code}, Missing GUST Percentage in 2024: {missing_percentage:.2f}%")

# Question 7: Mean, median, mode, and standard deviation for Cincinnati temperature in 2020
df = spark.read.csv("./cleaned_weather_data/2021/72429793812.csv", header=True, inferSchema=True)
df_cincinnati = df.withColumn("MONTH", month(col("DATE")))
result = df_cincinnati.groupBy("MONTH").agg(
    expr("mean(TEMP)").alias("Mean"),
    expr("percentile_approx(TEMP, 0.5)").alias("Median"),
    expr("mode(TEMP)").alias("Mode"),
    expr("stddev(TEMP)").alias("Standard Deviation")
)

result.orderBy("MONTH").show()

# Question 8: Top 10 lowest Wind Chill days in Cincinnati 2017
df = spark.read.csv("./cleaned_weather_data/2022/72429793812.csv", header=True, inferSchema=True)
df_cincinnati = df.filter((col("TEMP") < 50) & (col("WDSP") > 3))

df_cincinnati = df_cincinnati.withColumn(
    "Wind Chill",
    35.74 + (0.6215 * col("TEMP")) - (35.75 * (col("WDSP") ** 0.16)) + (0.4275 * col("TEMP") * (col("WDSP") ** 0.16))
)

df_cincinnati = df_cincinnati.withColumn("DATE", expr("date_format(DATE, 'yyyy-MM-dd')"))
result = df_cincinnati.select("DATE", "Wind Chill").orderBy("Wind Chill").limit(10)
result.show()

# Question 9: Count extreme weather days for Florida
base_directory = './cleaned_weather_data/'
file_paths = []
for year in range(2015, 2025):
    file_path = os.path.join(base_directory, str(year), '99495199999.csv')
    if os.path.exists(file_path):
        file_paths.append(file_path)

df = spark.read.csv(file_paths, header=True, inferSchema=True)
extreme_weather_count = df.filter(col("FRSHTT") != 0).count()
print(f"Number of days with extreme weather conditions in Florida: {extreme_weather_count}")

# Question 10: Predict max Temperature for Cincinnati in Nov and Dec 2024
base_directory = './cleaned_weather_data'
file_paths = []

for year in [2022, 2023]:
    file_path = os.path.join(base_directory, str(year), '72429793812.csv')
    if os.path.exists(file_path):
        file_paths.append(file_path)

historical_data = spark.read.csv(file_paths, header=True, inferSchema=True)
historical_df = historical_data.filter(
    (col("STATION") == "72429793812") & (month("DATE").isin([11, 12]))
)

training_data = historical_df.withColumn("DAY_OF_YEAR", dayofyear("DATE"))
assembler = VectorAssembler(inputCols=["DAY_OF_YEAR"], outputCol="features")
train_data = assembler.transform(training_data).select("features", col("MAX").alias("label"))

lr = LinearRegression()
lr_model = lr.fit(train_data)

predictions_df = spark.createDataFrame([(day,) for day in range(305, 366)], ["DAY_OF_YEAR"])
predictions = assembler.transform(predictions_df)
predicted_temps = lr_model.transform(predictions)

max_predictions = predicted_temps.withColumn(
    "MONTH", when(col("DAY_OF_YEAR") < 335, 11).otherwise(12)
).groupBy("MONTH").agg(spark_max("prediction").alias("Max Predicted Temp"))

max_predictions.show()

# Stop the Spark session
spark.stop()


Downloaded: ./weather_data/2021/99495199999.csv
Downloaded: ./weather_data/2021/72429793812.csv
Downloaded: ./weather_data/2022/99495199999.csv
Downloaded: ./weather_data/2022/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2021/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2021/72429793812.csv
Cleaned data saved to: ./cleaned_weather_data/2022/99495199999.csv
Cleaned data saved to: ./cleaned_weather_data/2022/72429793812.csv
Year: 2021, Station: 99495199999, Row count: 0
Year: 2021, Station: 72429793812, Row count: 365
Year: 2022, Station: 99495199999, Row count: 0
Year: 2022, Station: 72429793812, Row count: 364
+-----+------------------+------+----+------------------+
|MONTH|              Mean|Median|Mode|Standard Deviation|
+-----+------------------+------+----+------------------+
|    1|  33.9516129032258|  34.8|33.1| 4.899583041289802|
|    2| 29.95357142857143|  26.8|21.5| 9.450592070139862|
|    3|  47.8258064516129|  47.7|48.2| 7.747987598593389|


In [None]:
# Upload kaggle.json from local
from google.colab import files
files.upload()

# Move it to correct location
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


Saving kaggle.json to kaggle (1).json


In [None]:
from pyspark import SparkContext
from kaggle.api.kaggle_api_extended import KaggleApi
import os
import zipfile

def parse_line(line):
    """Parses each line of input data into (movie_id, rating)."""
    if line.startswith("userId,movieId,rating,timestamp"):
        return None
    parts = line.split(",")
    return (int(parts[1]), float(parts[2]))

def main():
    # Set up Kaggle API
    api = KaggleApi()
    api.authenticate()

    # Download dataset
    download_path = "kaggle_data"
    os.makedirs(download_path, exist_ok=True)
    api.dataset_download_files('rounakbanik/the-movies-dataset', path=download_path, unzip=True)

    dataset_file = os.path.join(download_path, "ratings.csv")

    # Set up Spark
    sc = SparkContext("local", "MovieRatings")

    # Read the input data
    input_rdd = sc.textFile(dataset_file)

    # Parse and filter the data
    mapped_rdd = input_rdd.filter(lambda line: not line.startswith("userId,movieId,rating,timestamp")) \
                          .map(parse_line)

    # Remove None values
    mapped_rdd = mapped_rdd.filter(lambda x: x is not None)

    # Calculate average ratings
    reduced_rdd = mapped_rdd.groupByKey().mapValues(lambda ratings: sum(ratings) / len(ratings))

    # Collect and print
    results = reduced_rdd.collect()
    for movie_id, avg_rating in results[:10]:  # Just print top 10 for demo
        print(f"Movie {movie_id} has an average rating of {avg_rating:.2f}")

    sc.stop()

if __name__ == "__main__":
    main()


OSError: Could not find kaggle.json. Make sure it's located in /root/.config/kaggle. Or use the environment method. See setup instructions at https://github.com/Kaggle/kaggle-api/