<a href="https://colab.research.google.com/github/YomnaaAshraf/ANN-project/blob/main/big_data_project_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pyspark

In [2]:
from pyspark.sql.functions import count, when, isnull

In [3]:
from pyspark.sql import SparkSession

# Initialize SparkSession with increased memory
spark = SparkSession.builder \
    .appName("Big Data Project") \
    .config("spark.executor.memory", "4g").config("spark.driver.memory", "4g").getOrCreate()

In [None]:
df = spark.read.csv("hdfs://namenode:9000/pharma_data/pharma_data.csv", header=True, inferSchema=True)

In [None]:
fraction = 10000 / df.count()
df = df.sample(withReplacement=False, fraction=fraction, seed=42)

In [None]:
df.show()

In [None]:
# Check for missing values
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

# Remove duplicates
df = df.dropDuplicates()

# Show the new shape of the dataframe
print(f"Shape after removing duplicates: {df.count()}")

In [None]:
from pyspark.sql.functions import lower, trim

# Strip whitespace and convert to lowercase in 'mapped_composition'
df = df.withColumn("mapped_composition", trim(lower(df["mapped_composition"])))

df.show(5)

In [None]:
from pyspark.sql.functions import regexp_extract

# Extract the numerical value from 'pack_size_label'
df = df.withColumn("pack_size", regexp_extract(df["pack_size_label"], r"(\d+)", 1).cast("int"))

df.show(5)

In [None]:
# Calculate the 1st and 99th percentiles for 'price'
quantiles = df.approxQuantile("price", [0.01, 0.99], 0.0)
lower_bound, upper_bound = quantiles

# Clip the 'price' column to remove outliers
from pyspark.sql.functions import col

df = df.withColumn("price",
                  when(col("price") < lower_bound, lower_bound)
                  .when(col("price") > upper_bound, upper_bound)
                  .otherwise(col("price")))

df.show(5)

In [None]:
from pyspark.sql.functions import col

# Convert 'Is_discontinued' to binary (0 or 1)
df = df.withColumn("Is_discontinued", col("Is_discontinued").cast("int"))

df.show(5)

In [None]:
import matplotlib.pyplot as plt

# Collect data into pandas for visualization
value_counts = df.groupBy("mapped_composition").count().toPandas()

# Plot the distribution
value_counts.plot(kind='bar', figsize=(15, 5), x='mapped_composition', y='count', legend=False)
plt.title("Distribution of Mapped Composition")
plt.xlabel("Mapped Composition")
plt.ylabel("Count")
plt.show()

In [None]:
!pip install imbalanced-learn

In [None]:
import pandas as pd

In [None]:
# Count occurrences of 'mapped_composition'
mapped_composition_counts = df.groupBy("mapped_composition").count()

# Filter out the classes with fewer than 1000 occurrences
rare_classes = mapped_composition_counts.filter(mapped_composition_counts['count'] < 100)

# Join to filter out these rare classes from the main dataframe
df_filtered_spark = df.join(rare_classes, on="mapped_composition", how="left_anti")

# Show the filtered DataFrame shape
filtered_shape = df_filtered_spark.count(), len(df_filtered_spark.columns)
print(f"Filtered shape: {filtered_shape}")

In [None]:
# Get the value counts in a Pandas DataFrame for plotting
value_counts = df_filtered_spark.groupBy("mapped_composition").count().toPandas()

# Plotting
value_counts.plot(kind='bar', figsize=(15, 5), x='mapped_composition', y='count', legend=False)
plt.title("Distribution of Mapped Composition")
plt.xlabel("Mapped Composition")
plt.ylabel("Count")
plt.show()

In [None]:
# Calculate the imbalance ratio
max_count = value_counts['count'].max()
min_count = value_counts['count'].min()
imbalance_ratio = max_count / min_count
print(f"Imbalance Ratio: {imbalance_ratio:.2f}")

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
indexer = StringIndexer(inputCol="pack_size_label", outputCol="pack_size_encoded")
df_filtered_spark = indexer.fit(df_filtered_spark).transform(df_filtered_spark)

In [None]:
# Select relevant columns and convert to Pandas
df_pandas = df_filtered_spark.select("price", "pack_size_encoded", "mapped_composition").toPandas()

# Define features (X) and target (y)
X = df_pandas[['price', 'pack_size_encoded']]
y = df_pandas['mapped_composition']

In [None]:
from imblearn.over_sampling import SMOTE

In [None]:
# Apply SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Show the resampled dataset shape
print(f"Resampled dataset shape: {X_resampled.shape}")

In [None]:
import pandas as pd

# Convert the resampled target variable 'y_resampled' to a DataFrame
y_resampled_df = pd.DataFrame(y_resampled, columns=["mapped_composition"])

# Count the occurrences of each class
class_counts = y_resampled_df['mapped_composition'].value_counts()

# Print the class counts
print(class_counts)

In [None]:
!pip install xgboost

In [None]:
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split

# Initialize LabelEncoder
label_encoder = LabelEncoder()

# Encode the target variable 'mapped_composition'
y_sampled_encoded = label_encoder.fit_transform(y_resampled_df)

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_sampled_encoded, test_size=0.2, random_state=42)

# Initialize XGBoost classifier
xgb_classifier = XGBClassifier(n_estimators=100, learning_rate=0.1, max_depth=6, random_state=42)

# Train the model
xgb_classifier.fit(X_train, y_train)

# Predict
y_pred_xgb = xgb_classifier.predict(X_test)

# Decode the predictions back to the original labels
y_pred_xgb_original = label_encoder.inverse_transform(y_pred_xgb)

# Evaluate
from sklearn.metrics import classification_report, accuracy_score

# Evaluate using numeric labels
print("XGBoost Accuracy:", accuracy_score(y_test, y_pred_xgb))
print("\nXGBoost Classification Report (Numeric Labels):")
print(classification_report(y_test, y_pred_xgb))