# **Step 1: Initialize Spark Session**
First, set up your Spark environment to handle large-scale data processing:

Set Spark Settings to handle large data of about 20GB

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=352eea55c0458b163870d3fae3ce34d6f6c36d4ac8922d656a1b6c54d634c153
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Language Detection") \
    .master("local[*]") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "20g") \
    .config("spark.default.parallelism", "100") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
    .getOrCreate()




# Step 2: Load the data
Read the dataset from a file, which could be stored on your Google Drive:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Adding schematic view while loading to speaden up the process rather than inferring a schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

schema = StructType([
    StructField("marketplace", StringType(), True),
    StructField("customer_id", StringType(), True),  # Change to IntegerType if applicable
    StructField("review_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_parent", StringType(), True),
    StructField("product_title", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("star_rating", IntegerType(), True),
    StructField("helpful_votes", IntegerType(), True),
    StructField("total_votes", IntegerType(), True),
    StructField("vine", StringType(), True),
    StructField("verified_purchase", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body", StringType(), True),
    StructField("review_date", DateType(), True)
])

Loading the data and adding it to a data frame

In [None]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .appName("Load Multiple TSV Files") \
    .getOrCreate()

# List of file paths
data_paths = [
    '/content/drive/MyDrive/archive/amazon_reviews_us_Apparel_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Automotive_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Baby_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Beauty_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Books_v1_02.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Camera_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Electronics_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Furniture_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Sports_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Grocery_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv',
    '/content/drive/MyDrive/archive/amazon_reviews_us_Music_v1_00.tsv'
]

# Read and sample from each dataset
sampled_dfs = []
for path in data_paths:
    df = spark.read.option("treatEmptyValuesAsNulls", "true").option("sep", "\t").csv(path, schema=schema, sep='\t', header=True)
    sampled_df = df.limit(20000)#700000)  # Takes the first 10,000 rows
    sampled_dfs.append(sampled_df)

# Union all the sampled dataframes into one
df = sampled_dfs[0]
for dataframe in sampled_dfs[1:]:
    df = df.union(dataframe)

# Show some data
df.show()

# You can now work with 'final_df' which contains 10,000 rows from each file


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32158956|R1KKOXHNI8MSXU|B01KL6O72Y|      24485154|Easy Tool Stainle...|         Apparel|          4|            0|          0|   N|                Y|★ THESE REALLY DO...|These Really Do W...| 2013-01-14|
|         US|    2714559|R26SP2OPDK4HT7|B01ID3ZS5W|     363128556|V28 Women Cowl Ne...|         Apparel|          5|    

Checking Virtual Memory

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 89.6 gigabytes of available RAM

You are using a high-RAM runtime!


# Step 3: Data Preprocessing

Implement steps to clean and preprocess the data, ensuring it is ready for analysis or machine learning.

In [None]:
from pyspark.sql.functions import col, lower, regexp_replace, concat_ws, udf, substring, row_number

# Remove duplicates
df = df.dropDuplicates()

# Handle missing values for both review_body and review_headline simultaneously
df = df.na.fill({
    "review_body": "No review text",
    "review_headline": "No review headline"
})

# Concatenate cleaned review headline and cleaned review text
df = df.withColumn("full_text", concat_ws(". ", "review_headline", "review_body"))

# Truncate full_text to the first 512 characters
df = df.withColumn("full_text", substring(col("full_text"), 1, 512))

df = df.drop("marketplace", "review_id", "helpful_votes", "total_votes", "vine", "verified_purchase", "review_body","review_headline", "review_date")

df.cache()

DataFrame[customer_id: string, product_id: string, product_parent: string, product_title: string, product_category: string, star_rating: int, full_text: string]

In [None]:
df = df.repartition(2000)
df.cache()
df.count()

240000

In [None]:
local_data = df.toPandas()

In [None]:
!pip install wget

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=64c02e7d9c1390f1ad912a21510c5edff83b267b1ba48f95b8f653eb8b47cfab
  Stored in directory: /root/.cache/pip/wheels/8b/f1/7f/5c94f0a7a505ca1c81cd1d9208ae2064675d97582078e6c769
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2


In [None]:
pip install fasttext

Collecting fasttext
  Downloading fasttext-0.9.2.tar.gz (68 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/68.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.8/68.8 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pybind11>=2.2 (from fasttext)
  Using cached pybind11-2.12.0-py3-none-any.whl (234 kB)
Building wheels for collected packages: fasttext
  Building wheel for fasttext (setup.py) ... [?25l[?25hdone
  Created wheel for fasttext: filename=fasttext-0.9.2-cp310-cp310-linux_x86_64.whl size=4227138 sha256=b4120260ac8ae62d4ca2718ed4e5bde42b1b9f045f214ea2ad16912be113b760
  Stored in directory: /root/.cache/pip/wheels/a5/13/75/f811c84a8ab36eedbaef977a6a58a98990e8e0f1967f98f394
Successfully built fasttext
Installing collected packages: pybind11, fasttext
Successfully installed fasttext-0.9.2 pybind11-2.12.0


# Step 4: Language Detection

The `lid.176.bin` model is a pre-trained language identification tool from Facebook's AI Research (FAIR) team, part of the FastText library. It effectively distinguishes among 176 languages, leveraging subword information from texts to provide high accuracy, even with short text snippets. Renowned for its speed and efficiency, `lid.176.bin` is ideal for applications requiring rapid and reliable language detection, such as routing texts to language-specific processing pipelines in real-time systems. This makes it a valuable resource for global platforms handling diverse, multilingual data sets.

In [None]:
# Install wget if not already installed (uncomment the line below if necessary)
# !pip install wget

import wget
import os

model_url = 'https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin'
model_path = 'lid.176.bin'

# Check if the model already exists to avoid re-downloading it
if not os.path.exists(model_path):
    print("Downloading the model...")
    wget.download(model_url, model_path)
else:
    print("Model already exists.")

# Now, you can proceed with loading the model using FastText
import fasttext
try:
    lang_model = fasttext.load_model(model_path)
    print("Model loaded successfully.")
except Exception as e:
    print("Failed to load model:", e)


Downloading the model...
Model loaded successfully.




In [None]:
import fasttext
import pandas as pd

# Load the language detection model
model = fasttext.load_model('lid.176.bin')

# Define a function to detect language using FastText
def detect_language(text):
    predictions = model.predict(text)
    return predictions[0][0].replace("__label__", "")  # Cleaning the label

# Apply language detection
local_data['language'] = local_data['full_text'].apply(detect_language)




In [None]:
final_df = spark.createDataFrame(local_data)

In [None]:
# Continue processing or save the results
final_df.write.format("parquet").save("/content/drive/MyDrive/Big Data Project/language.parquet", mode='overwrite')


In [None]:
final_df.show(n=2000, truncate=False)

+-----------+----------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
