In [2]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [3]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [4]:
import re
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType

file_path = "file:///home/talentum/kafka_data1/train_trim.txt"

unstSchema = StructType([
    StructField('Label', StringType(), True),
    StructField('Review', StringType(), True)
])

baseRDD = sc.textFile(file_path)

# SAFE split + filter
rdd = (
    baseRDD
    .map(lambda x: x.strip())
    .filter(lambda x: x != "")
    .map(lambda x: re.split(r" ", x, maxsplit=1))
    .filter(lambda x: len(x) == 2)
    .map(lambda x: Row(x[0].strip(), x[1].strip()))
)

df1 = spark.createDataFrame(rdd, schema=unstSchema)

df1.printSchema()
df1.show(8, truncate=False)



root
 |-- Label: string (nullable = true)
 |-- Review: string (nullable = true)

+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Label     |Review                                                              

In [8]:
stop_words = ['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 'not',
 'only',
 'own',
 'same',
 'so',
 'than',
 'too',
 'very',
 'can',
 'will',
 'just',
 'don',
 'should',
 'now']

In [9]:
from pyspark.sql.functions import (
    col, lower, split, array_except, concat_ws, array, lit
)

# convert python list to spark array column
stop_words_col = array(*[lit(w) for w in stop_words])

df_clean = df1.withColumn(
    "Review",
    concat_ws(
        " ",
        array_except(
            split(lower(col("Review")), " "),
            stop_words_col
        )
    )
)

df_clean.show(8, truncate=False)



+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Label     |Review                                                                                                                                                                                                                                                                                                                                                                                                                            

In [11]:
from pyspark.sql.functions import regexp_replace

df_new = df_clean.withColumn(
    "Review",
    regexp_replace(col("Review"), r"[^a-z\s]", "")
)
df_new.show(8, truncate=False)

+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Label     |Review                                                                                                                                                                                                                                                                                                                                                                                                                                                                 

In [16]:
# from pyspark.sql.functions import trim
# df_trim = df_new.withColumn("Review", trim(col("Review")))
# df_trim.show(1)

In [20]:
from pyspark.sql.functions import col, when

df_transform = df_new.withColumn(
    "Rating",
    when(col("Label") == "__label__2", 4)
    .when(col("Label") == "__label__1", 2)
    .otherwise(3)
)

df_transform.show(8, truncate=False)

                               

+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|Label     |Review                                                                                                                                                                                                                                                                                                                                                                                                                                                          

In [23]:
df_processed_txt=df_transform.drop("Label")
df_processed_txt.show(8)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|stuning even nong...|     4|
|best soundtrack e...|     4|
|amazing soundtrac...|     4|
|excellent soundtr...|     4|
|remember pull jaw...|     4|
|absolute masterpi...|     4|
|buyer beware self...|     2|
|glorious story lo...|     4|
+--------------------+------+
only showing top 8 rows



In [None]:
processing CSV data

In [27]:
df_csv=spark.read.csv("file:///home/talentum/kafka_data1/structured.csv",header=True,inferSchema=True)
df_csv.show(8)

+----+--------------------+
|Rate|             Summary|
+----+--------------------+
|   5|great cooler exce...|
|   5|best budget 2 fit...|
|   3|the quality is go...|
|   1|very bad product ...|
|   3|       ok ok product|
|   5|the cooler is rea...|
|   5|   very good product|
|   3|           very nice|
+----+--------------------+
only showing top 8 rows



In [30]:

df_rename=df_csv.withColumnRenamed("Rate","Rating").withColumnRenamed("Summary","Review")
df_swap=df_rename.columns
df_swap[0],df_swap[1] = df_swap[1],df_swap[0]
df_neww = df_rename.select(df_swap)
df_neww.show(8)




+--------------------+------+
|              Review|Rating|
+--------------------+------+
|great cooler exce...|     5|
|best budget 2 fit...|     5|
|the quality is go...|     3|
|very bad product ...|     1|
|       ok ok product|     3|
|the cooler is rea...|     5|
|   very good product|     5|
|           very nice|     3|
+--------------------+------+
only showing top 8 rows



In [36]:


from pyspark.sql.functions import col, trim, regexp_replace

df_trim = df_neww.withColumn(
    "Review",
    trim(regexp_replace(col("Review"), r"^[\s\u00A0]+", ""))
)

df_trim.show(8, truncate=False)


+----------------------------------------------------------------------------------------------+------+
|Review                                                                                        |Rating|
+----------------------------------------------------------------------------------------------+------+
|great cooler excellent air flow and for this price its so amazing and unbelievablejust love it|5     |
|best budget 2 fit cooler nice cooling                                                         |5     |
|the quality is good but the power of air is decent                                            |3     |
|very bad product its a only a fan                                                             |1     |
|ok ok product                                                                                 |3     |
|the cooler is really fantastic and provides good air flow highly recommended                  |5     |
|very good product                                              

In [None]:
mergeing two datasets

In [38]:
df_merged=df_processed_txt.unionByName(df_trim)
df_merged.count()

405052