# NATURAL LANGUAGE PROCESSING

## Setup

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.2.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [3]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)



:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0f88f231-4e20-4470-9f52-ec008753752b;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 294ms :: artifacts dl 16ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------

3.2.0


In [4]:
bucket = "project-group34"

## Import Libraries

In [5]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


## Import Data

In [6]:
%%time
session = sagemaker.Session()
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading comments from s3a://project-group34/project/comments/yyyy=*


23/11/19 21:49:17 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

CPU times: user 63.1 ms, sys: 12 ms, total: 75.1 ms
Wall time: 6.9 s


In [6]:
comments = comments.cache()

In [8]:
comments.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [29]:
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality").show()

+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|
|    Animesuggest|      [deleted]|           [deleted]| t3_m3vnjl|gqscjse|2021-03-13 10:18:25|    1|               0|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|
|    Animesuggest|    Dropsoftime|Try Mahouka kouko...| 

In [7]:
# Filter out rows where 'body' or 'author' is '[deleted]'
comments_filtered = comments.filter((comments.body != '[deleted]') & (comments.author != '[deleted]'))

# Show the filtered DataFrame
comments_filtered = comments_filtered.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality")

In [9]:
comments_filtered.show()

[Stage 1:>                                                          (0 + 1) / 1]

+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|
|    Animesuggest|    Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|
|MovieSuggestions|   alienstabler|[Holes (2003)](ht...| 

                                                                                

In [10]:
!pip install spacy

Collecting spacy
  Obtaining dependency information for spacy from https://files.pythonhosted.org/packages/d6/c8/d659de66da2be05ad10fe029954a90a5fb6ddffb75aaab6702173e9dd0c9/spacy-3.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Using cached spacy-3.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (25 kB)
Collecting spacy-legacy<3.1.0,>=3.0.11 (from spacy)
  Using cached spacy_legacy-3.0.12-py2.py3-none-any.whl (29 kB)
Collecting spacy-loggers<2.0.0,>=1.0.0 (from spacy)
  Obtaining dependency information for spacy-loggers<2.0.0,>=1.0.0 from https://files.pythonhosted.org/packages/33/78/d1a1a026ef3af911159398c939b1509d5c36fe524c7b644f34a5146c4e16/spacy_loggers-1.0.5-py3-none-any.whl.metadata
  Using cached spacy_loggers-1.0.5-py3-none-any.whl.metadata (23 kB)
Collecting murmurhash<1.1.0,>=0.28.0 (from spacy)
  Obtaining dependency information for murmurhash<1.1.0,>=0.28.0 from https://files.pythonhosted.org/packages/a8/ca/359ae4246cccaf3

In [11]:
!python -m spacy download en_core_web_sm

Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m38.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: en-core-web-sm
Successfully installed en-core-web-sm-3.7.1
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [89]:
# import spacy
# import re
# from pyspark.sql.functions import udf
# from pyspark.sql.types import ArrayType, StringType

# # Load the NLP model once
# nlp = spacy.load("en_core_web_sm")

# def extract_movie_names(text):
    
#     doc = nlp(text)

#     movie_positions = []
#     movie_names = []
    
#     # Extract named entities from the context window
#     for ent in doc.ents:
#         if ent.label_ == "ORG" or ent.label_ == "PERSON": 
#             movie_positions.append((ent.start_char, ent.end_char))
#             movie_names.append(ent.text)
            
#     return movie_positions, movie_names

# def remove_movie_names(text, movie_positions):
#     # Sort positions in reverse order to avoid shifting positions during removal
#     movie_positions = sorted(movie_positions, key=lambda x: x[0], reverse=True)

#     # Remove movie names from the text
#     for start, end in movie_positions:
#         text = text[:start] + ' ' + text[end:]

#     # Remove extra spaces
#     text = ' '.join(text.split())

#     return text

# def extract_movie_names_regex(text, movie_names):
#     # Define a regex pattern for movie names
#     movie_name_pattern = r"(?:'([^']+)'|\"([^\"]+)\"|([A-Z][a-z]*(?: [A-Z0-9][a-z0-9]*)*)(?: \(\d{4}\))?)"

#     # Find matches in the text using the regex pattern
#     movie_matches = re.findall(movie_name_pattern, text)

#     # Combine the results from the capturing groups
#     movies = [match[0] or match[1] or match[2] for match in movie_matches]
#     movie_names = movie_names + movies

#     return movie_names

# # Create a UDF
# extract_movie_names_udf = udf(lambda text: extract_movie_names(text, nlp), ArrayType(StringType()))

# # Applying the UDF to your DataFrame
# df_with_movie_names = comments_filtered.withColumn("movie_names", extract_movie_names_udf(comments_filtered["body"]))
# df_with_movie_names.show()


In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

import spacy
import re

# Load spaCy model
nlp = spacy.load("en_core_web_sm")

# Define schema for the UDF output
movie_schema = StructType([
    StructField("movie_positions", ArrayType(ArrayType(StringType()))),
    StructField("movie_names", ArrayType(StringType()))
])

# UDF to extract movie names
@udf(movie_schema)
def extract_movie_names_udf(text):
    doc = nlp(text)
    movie_positions = []
    movie_names = []

    for ent in doc.ents:
        if ent.label_ == "ORG" or ent.label_ == "PERSON":
            movie_positions.append([ent.start_char, ent.end_char])
            movie_names.append(ent.text)

    return (movie_positions, movie_names)

# UDF to remove movie names
@udf(StringType())
def remove_movie_names_udf(text, movie_positions):
    # Reverse sort positions to avoid shifting positions
    if movie_positions:
        movie_positions = sorted([(int(start), int(end)) for start, end in movie_positions], key=lambda x: x[0], reverse=True)

        for start, end in movie_positions:
            text = text[:start] + ' ' + text[end:]

        return ' '.join(text.split())
    
    else:
        return text

# UDF to extract movie names using regex
@udf(ArrayType(StringType()))
def extract_movie_names_regex_udf(text, movie_names):
    movie_name_pattern = r'(?:\"([^\"]+)\"|([A-Z][a-z]*(?:\s+(?:[a-z]+\s+)*[A-Z][a-z]*)*)(?: \(\d{4}\))?)'

    movie_matches = re.findall(movie_name_pattern, text)
    movies = [match[0] or match[1] or match[2] for match in movie_matches]
    return movie_names + movies

# Applying the UDFs to the DataFrame
df_with_movie_data = comments_filtered.withColumn("movie_data", extract_movie_names_udf(comments_filtered["body"]))
df_removed_movie_names = df_with_movie_data.withColumn("body_no_movies", remove_movie_names_udf(comments["body"], df_with_movie_data["movie_data.movie_positions"]))
df_final = df_removed_movie_names.withColumn("movie_names", extract_movie_names_regex_udf(df_removed_movie_names["body_no_movies"], df_removed_movie_names["movie_data.movie_names"]))

In [13]:
# If you want to print the first row as a dictionary for better readability
first_row_dict = df_final.first().asDict()
print(first_row_dict)

[Stage 2:>                                                          (0 + 1) / 1]

{'subreddit': 'Animesuggest', 'author': 'Athenza', 'body': '{Now and Then, Here and There}', 'parent_id': 't3_m3ygv3', 'id': 'gqscelh', 'created_utc': datetime.datetime(2021, 3, 13, 10, 15, 52), 'score': 2, 'controversiality': 0, 'movie_data': Row(movie_positions=[], movie_names=[]), 'body_no_movies': '{Now and Then, Here and There}', 'movie_names': ['Now and Then', 'Here and There']}


Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

In [92]:
df_final.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "movie_names").show()

[Stage 3:>                                                          (0 + 1) / 1]

+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|         movie_names|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|[Now and Then, He...|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|[Genres, Sci-Fi, ...|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|     [Jacobs Ladder]|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|[{Ping Pong the A...|
|    Animesuggest|    Drops

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

In [10]:
df_final = df_final.cache()

23/11/19 21:50:02 WARN CacheManager: Asked to cache already cached data.


In [11]:
stop_words = set(["a", "an", "the", "and", "or", "but", "is", "are", "was", "were", "be", "been", "being", "there", "he", "she"])  # Example stop words

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def remove_stop_word_from_movie_names(movies):
    if movies and len(movies[0].split()) == 1 and movies[0].lower() in stop_words:
        return movies[1:]
    return movies

remove_stop_word_udf = udf(remove_stop_word_from_movie_names, ArrayType(StringType()))

In [13]:
df_final = df_final.withColumn("movie_names", remove_stop_word_udf(df_final["movie_names"]))

In [14]:
df_final_sample = df_final.limit(100)

In [15]:
from pyspark.sql.functions import explode, col, count

# Flatten the movie_names column
df_flattened = df_final_sample.withColumn("movie_name", explode(col("movie_names")))

# Group by movie_name and count the occurrences
df_movie_frequency = df_flattened.groupBy("movie_name").agg(count("*").alias("frequency"))

In [None]:
# Show the resulting DataFrame
df_movie_frequency_pd = df_movie_frequency.toPandas()

[Stage 1:>                                                        (0 + 4) / 241]
KeyboardInterrupt



In [None]:
spark.stop()

In [2]:
!mkdir -p ./code

In [None]:
%%writefile ./code/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")    
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
    parser.add_argument("--values_to_keep", type=str, help="comma separated list of values to keep in the filtered set")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    logger.info(f"finished reading files...")
    

    
    # filter the dataframe to only keep the values of interest
    vals = [s.strip() for s in args.values_to_keep.split(",")]
    df_filtered = df.where(col(args.col_name_for_filtering).isin(vals))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}"
    logger.info(f"going to write data for {vals} in {s3_path}")
    logger.info(f"shape of the df_filtered dataframe is {df_filtered.count():,}x{len(df_filtered.columns)}")
    df_filtered.write.mode("overwrite").parquet(s3_path)
    
    logger.info(f"all done...")
    
if __name__ == "__main__":
    main()

In [None]:
%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = bucket_name
output_prefix_logs = f"spark_logs"

configuration = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
    }
]

# the dataset contains data for these 3 years
year_list = [2021,2022,2023]

In [None]:
%%time
for yyyy in year_list:
    print(f"going to filter comments data for year={yyyy}")
    s3_dataset_path_commments = f"s3://bigdatateaching/reddit-parquet/comments/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/comments/yyyy=*/mm=*/*comments*.parquet"
    output_prefix_data_comments = f"project/comments/yyyy={yyyy}"
    col_name_for_filtering = "subreddit"
    subreddits = "MovieSuggestions, televisionsuggestions, Animesuggest"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
            "--s3_dataset_path",
            s3_dataset_path_commments,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_comments,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)