In [1]:
!pip install pyspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=984300fbf2ba8f7526e5238530bd4030a121a3d642739f1daef0fa13abbcff93
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 k

In [2]:
# Imports

import os
import pyspark
import findspark
import pandas as pd
import csv 
import os
from pyspark.sql import SparkSession
import re
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [3]:
# Kaggle keys

os.environ["KAGGLE_USERNAME"] = "lucagee"
os.environ["KAGGLE_KEY"] = "a2fbb8660c3567ac94eb3e66d1619953"

In [4]:
# Download medal dataset

ref = "xhlulu/medal-emnlp"

!kaggle datasets download $ref --unzip -p .


Downloading medal-emnlp.zip to .
100% 6.80G/6.82G [00:46<00:00, 160MB/s]
100% 6.82G/6.82G [00:46<00:00, 158MB/s]


In [5]:
# Define Spark environment

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [6]:
# Read data

df = spark.read.csv("full_data.csv", header = True)
df.show()

+--------------------+--------------------+--------------------+
|                TEXT|            LOCATION|               LABEL|
+--------------------+--------------------+--------------------+
|alphabisabolol ha...|                  56|           substrate|
|a report is given...|24|49|68|113|137|172|carcinosarcoma|re...|
|the virostatic co...|                  55|           substrate|
|rmi rmi and rmi a...|   25|82|127|182|222|compounds|compoun...|
|a doubleblind stu...|22|26|28|77|90|14...|oxazepam|placebo|...|
|stroma from eithe...|         6|82|84|107|red cells|serum|a...|
|the effect of the...|                4|13|major|pentose pho...|
|in one experiment...|        32|44|76|135|feeding|feeding|a...|
|the presence of a...|7|15|63|137|199|2...|active|study|acti...|
|the reaction of g...|     113|203|209|250|stable|assay|bind...|
|choline acetyltra...|                  44|             caudate|
|increasing concen...|                  81|        displacement|
|the properties of...|   

# Preprocessing

- Na removal
- making text case insesitive

In [7]:
# Drop useless variables

df = df.drop("LOCATION", "LABEL")

In [8]:
# Drop Na values and duplicates

df.na.drop()
df.dropDuplicates()

DataFrame[TEXT: string]

In [9]:
# Definition of useful variables for cleaning

def remove_digits(tweet):
  '''
    Function that takes as input a string and removes all digits 
  '''
  tweet = re.sub(r'[0-9]', '', tweet)

  return tweet

def remove_punctuation(tweet):


    import string
    '''
      Function that takes as input a tweet and removes all punctuation
    '''

    tweet = tweet.translate(
      str.maketrans(string.punctuation + "…’”“‘", " " * (len(string.punctuation) + 5))
  )

    return tweet

def remove_extra_spaces(tweet):

  '''
    Function that takes as input a tweet and removes all extra white space
  '''

  tweet = " ".join(tweet.strip().split())
  
  return tweet

def remove_stopwords(tweet):

  '''
    Function that takes as input a tweet and removes all English stopwords
  '''
  from nltk.corpus import stopwords

  stop_words = list(set(stopwords.words("english")))
  [stop_words.append(w) for w in ["lol", "mkr", "http", "andre", "kat", "co", "rt", "oh"]]  # Adding useless words that seemed to come up very often

  output =  ' '.join([word for word in tweet.split() if word not in stop_words])
  
  return output


def cleaning(tweet):

  ''' 
    Function that takes as input a tweet and applies all preprocessing functions required to fully clean it
  '''

  tweet = tweet.lower() # standardise all tweet to lower case

  tweet = remove_digits(tweet)
  tweet = remove_extra_spaces(tweet)
  tweet = remove_punctuation(tweet)
  tweet = remove_stopwords(tweet)

  return tweet


In [10]:
from pyspark.sql.functions import col, udf

cleanUDF = udf(lambda x: cleaning(x)) 
df = df.withColumn("text", cleanUDF(col("TEXT")))

In [18]:
# Add column ID

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id", monotonically_increasing_id())

In [None]:
# Convert to RDD

rdd = df.rdd

# Shingling

In [29]:
shingle_length = 4

In [None]:
shingles = rdd.flatMap( 
                  lambda x: [(x.id, x.text[i : i + shingle_length]) for i in range(len(x.text) - shingle_length)])#.distinct()

shingles.take(5)