<a href="https://colab.research.google.com/github/MauricioFBL/capstone_project/blob/main/notebooks/transformations_casptone.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
#!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

import findspark
findspark.init()
findspark.find()

from pyspark.sql import DataFrame, SparkSession 
from pyspark.sql.functions import *


spark = SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

spark

In [9]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

from pyspark.sql.functions import *

In [3]:
movie_review_df = (spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv('movie_review.csv'))

In [4]:
movie_review_df.show(5)

+-----+--------------------+---------+
|  cid|          review_str|id_review|
+-----+--------------------+---------+
|13756|Once again Mr. Co...|        1|
|15738|This is an exampl...|        2|
|15727|First of all I ha...|        3|
|17954|Not even the Beat...|        4|
|16579|Brass pictures (m...|        5|
+-----+--------------------+---------+
only showing top 5 rows



In [5]:
tokenizer = Tokenizer(inputCol='review_str', outputCol='Words')
movie_review_df = tokenizer.transform(movie_review_df)
movie_review_df.show(5)

+-----+--------------------+---------+--------------------+
|  cid|          review_str|id_review|               Words|
+-----+--------------------+---------+--------------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|
|15738|This is an exampl...|        2|[this, is, an, ex...|
|15727|First of all I ha...|        3|[first, of, all, ...|
|17954|Not even the Beat...|        4|[not, even, the, ...|
|16579|Brass pictures (m...|        5|[brass, pictures,...|
+-----+--------------------+---------+--------------------+
only showing top 5 rows



In [6]:

remover = StopWordsRemover(inputCol='Words', outputCol='words_filtered')
movie_review_df = remover.transform(movie_review_df)
movie_review_df.show(5)

+-----+--------------------+---------+--------------------+--------------------+
|  cid|          review_str|id_review|               Words|      words_filtered|
+-----+--------------------+---------+--------------------+--------------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|
+-----+--------------------+---------+--------------------+--------------------+
only showing top 5 rows



In [7]:
movie_review_df.printSchema()

root
 |-- cid: integer (nullable = true)
 |-- review_str: string (nullable = true)
 |-- id_review: integer (nullable = true)
 |-- Words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [10]:
movie_review_df = movie_review_df.withColumn("positive_review",when(array_contains(col("words_filtered"),"good"),1).otherwise(0))
movie_review_df.show(5)

+-----+--------------------+---------+--------------------+--------------------+---------------+
|  cid|          review_str|id_review|               Words|      words_filtered|positive_review|
+-----+--------------------+---------+--------------------+--------------------+---------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|              0|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|              0|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|              0|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|              0|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|              1|
+-----+--------------------+---------+--------------------+--------------------+---------------+
only showing top 5 rows



In [11]:
# dt.datetime.now()
movie_review_df = movie_review_df.withColumn('insert_date ', current_timestamp())
movie_review_df.show(5)

+-----+--------------------+---------+--------------------+--------------------+---------------+--------------------+
|  cid|          review_str|id_review|               Words|      words_filtered|positive_review|        insert_date |
+-----+--------------------+---------+--------------------+--------------------+---------------+--------------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|              0|2022-07-25 13:35:...|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|              0|2022-07-25 13:35:...|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|              0|2022-07-25 13:35:...|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|              0|2022-07-25 13:35:...|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|              1|2022-07-25 13:35:...|
+-----+--------------------+---------+------------------

In [12]:
moviereviews = movie_review_df.drop('review_str','Words','words_filtered')

In [13]:
moviereviews.show(5)

+-----+---------+---------------+--------------------+
|  cid|id_review|positive_review|        insert_date |
+-----+---------+---------------+--------------------+
|13756|        1|              0|2022-07-25 13:35:...|
|15738|        2|              0|2022-07-25 13:35:...|
|15727|        3|              0|2022-07-25 13:35:...|
|17954|        4|              0|2022-07-25 13:35:...|
|16579|        5|              1|2022-07-25 13:35:...|
+-----+---------+---------------+--------------------+
only showing top 5 rows



In [14]:
moviereviews.write.mode('overwrite').csv('clean_movie_revies.csv')

# Log transformations

In [16]:
log_review_df = (spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv('log_reviews.csv'))
log_review_df.show(10)

+---------+--------------------+
|id_review|                 log|
+---------+--------------------+
|        1|<reviewlog><log><...|
|        2|<reviewlog><log><...|
|        3|<reviewlog><log><...|
|        4|<reviewlog><log><...|
|        5|<reviewlog><log><...|
|        6|<reviewlog><log><...|
|        7|<reviewlog><log><...|
|        8|<reviewlog><log><...|
|        9|<reviewlog><log><...|
|       10|<reviewlog><log><...|
+---------+--------------------+
only showing top 10 rows



In [17]:
log_review_df = log_review_df.withColumn('log', regexp_replace(
    'log', '<reviewlog><log><logDate>', '')).withColumn('log', regexp_replace(
    'log', '</phoneNumber></log></reviewlog>', ''))
log_review_df.show(5)

+---------+--------------------+
|id_review|                 log|
+---------+--------------------+
|        1|04-25-2021</logDa...|
|        2|03-13-2021</logDa...|
|        3|09-30-2021</logDa...|
|        4|05-24-2021</logDa...|
|        5|02-01-2021</logDa...|
+---------+--------------------+
only showing top 5 rows



In [18]:
log_review_df = log_review_df.withColumn(
    'log_date', split(log_review_df['log'], 
                      '</logDate><device>').getItem(0))
log_review_df.show(5)

+---------+--------------------+----------+
|id_review|                 log|  log_date|
+---------+--------------------+----------+
|        1|04-25-2021</logDa...|04-25-2021|
|        2|03-13-2021</logDa...|03-13-2021|
|        3|09-30-2021</logDa...|09-30-2021|
|        4|05-24-2021</logDa...|05-24-2021|
|        5|02-01-2021</logDa...|02-01-2021|
+---------+--------------------+----------+
only showing top 5 rows



In [19]:
log_review_df = log_review_df.withColumn(
    'device_drop', split(log_review_df['log'], 
                      '</device><location>').getItem(0)
    )
log_review_df = log_review_df.withColumn(
    'device', split(log_review_df['device_drop'], 
                      '</logDate><device>').getItem(1)
    )

log_review_df.show(5)

+---------+--------------------+----------+--------------------+------+
|id_review|                 log|  log_date|         device_drop|device|
+---------+--------------------+----------+--------------------+------+
|        1|04-25-2021</logDa...|04-25-2021|04-25-2021</logDa...|Mobile|
|        2|03-13-2021</logDa...|03-13-2021|03-13-2021</logDa...|Tablet|
|        3|09-30-2021</logDa...|09-30-2021|09-30-2021</logDa...|Tablet|
|        4|05-24-2021</logDa...|05-24-2021|05-24-2021</logDa...|Tablet|
|        5|02-01-2021</logDa...|02-01-2021|02-01-2021</logDa...|Tablet|
+---------+--------------------+----------+--------------------+------+
only showing top 5 rows



In [20]:
log_review_df = log_review_df.withColumn(
    'device_drop', split(log_review_df['log'], 
                      '</location><os>').getItem(0)
    )
log_review_df = log_review_df.withColumn(
    'location', split(log_review_df['device_drop'], 
                      '</device><location>').getItem(1)
    )

log_review_df.show(5)

+---------+--------------------+----------+--------------------+------+-------------+
|id_review|                 log|  log_date|         device_drop|device|     location|
+---------+--------------------+----------+--------------------+------+-------------+
|        1|04-25-2021</logDa...|04-25-2021|04-25-2021</logDa...|Mobile|       Kansas|
|        2|03-13-2021</logDa...|03-13-2021|03-13-2021</logDa...|Tablet|       Oregon|
|        3|09-30-2021</logDa...|09-30-2021|09-30-2021</logDa...|Tablet|    Minnesota|
|        4|05-24-2021</logDa...|05-24-2021|05-24-2021</logDa...|Tablet|     Arkansas|
|        5|02-01-2021</logDa...|02-01-2021|02-01-2021</logDa...|Tablet|New Hampshire|
+---------+--------------------+----------+--------------------+------+-------------+
only showing top 5 rows



In [21]:
log_review_df = log_review_df.withColumn(
    'device_drop', split(log_review_df['log'], 
                      '</os><ipAddress>').getItem(0)
    )
log_review_df = log_review_df.withColumn(
    'os', split(log_review_df['device_drop'], 
                      '</location><os>').getItem(1)
    )

log_review_df.show(5)

+---------+--------------------+----------+--------------------+------+-------------+--------------+
|id_review|                 log|  log_date|         device_drop|device|     location|            os|
+---------+--------------------+----------+--------------------+------+-------------+--------------+
|        1|04-25-2021</logDa...|04-25-2021|04-25-2021</logDa...|Mobile|       Kansas|     Apple iOS|
|        2|03-13-2021</logDa...|03-13-2021|03-13-2021</logDa...|Tablet|       Oregon|Google Android|
|        3|09-30-2021</logDa...|09-30-2021|09-30-2021</logDa...|Tablet|    Minnesota|     Apple iOS|
|        4|05-24-2021</logDa...|05-24-2021|05-24-2021</logDa...|Tablet|     Arkansas|   Apple MacOS|
|        5|02-01-2021</logDa...|02-01-2021|02-01-2021</logDa...|Tablet|New Hampshire|         Linux|
+---------+--------------------+----------+--------------------+------+-------------+--------------+
only showing top 5 rows



In [22]:
log_review_df = log_review_df.withColumn(
    'phoneNumber', split(log_review_df['log'], 
                      '</ipAddress><phoneNumber>').getItem(1)
    )
# log_review_df = log_review_df.withColumn(
#     'ipAddress', split(log_review_df['device_drop'], 
#                       '</os><ipAddress>').getItem(1)
#     )

log_review_df.show(5)

+---------+--------------------+----------+--------------------+------+-------------+--------------+------------+
|id_review|                 log|  log_date|         device_drop|device|     location|            os| phoneNumber|
+---------+--------------------+----------+--------------------+------+-------------+--------------+------------+
|        1|04-25-2021</logDa...|04-25-2021|04-25-2021</logDa...|Mobile|       Kansas|     Apple iOS|821-540-5777|
|        2|03-13-2021</logDa...|03-13-2021|03-13-2021</logDa...|Tablet|       Oregon|Google Android|819-102-1320|
|        3|09-30-2021</logDa...|09-30-2021|09-30-2021</logDa...|Tablet|    Minnesota|     Apple iOS|989-156-0498|
|        4|05-24-2021</logDa...|05-24-2021|05-24-2021</logDa...|Tablet|     Arkansas|   Apple MacOS|225-837-9935|
|        5|02-01-2021</logDa...|02-01-2021|02-01-2021</logDa...|Tablet|New Hampshire|         Linux|243-842-4562|
+---------+--------------------+----------+--------------------+------+-------------+---

In [None]:
#____________________________________

In [None]:
#   import com.databricks.spark.xml._
#   import com.databricks.spark.xml.functions.from_xml

#   val spark = SparkSession.builder().master("local[*]").getOrCreate()
#   import spark.implicits._
#   spark.sparkContext.setLogLevel("ERROR")

#   val df = // Read csv file
  
#   // Assuming your xml content column name is xmldata
#   val xmlSchema = schema_of_xml(df.select("xmldata").as[String])

#   df.withColumn("xmldata", from_xml('xmldata, xmlSchema))
#     .select("*", "xmldata.ab")
#     .selectExpr(df.columns.diff(Array("xmldata")) ++
#       Array("ab[0]._a as name", "ab[0]._b as id", "ab[1]._a as manager_name", "ab[1]._b as manager_id"): _*)
#     .show(false)

In [None]:
# !$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.15.0

:: loading settings :: url = jar:file:/content/spark-3.2.2-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c1fc4ee6-2798-4c49-83ff-ad94decf51a8;1.0
	confs: [default]
	found com.databricks#spark-xml_2.12;0.15.0 in central
	found commons-io#commons-io;2.11.0 in central
	found org.glassfish.jaxb#txw2;3.0.2 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.3.0 in central
downloading https://repo1.maven.org/maven2/com/databricks/spark-xml_2.12/0.15.0/spark-xml_2.12-0.15.0.jar ...
	[SUCCESSFUL ] com.databricks#spark-xml_2.12;0.15.0!spark-xml_2.12.jar (44ms)
downloading https://repo1.maven.org/maven2/commons-io/commons-io/2.11.0/commons-io-2.11.0.jar ...
	[SUCCESSFUL ] commons-io#commons-io;2.11.0!commons-io.jar (38ms)
downloading https://re

In [None]:
# import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0 pyspark-shell'

In [None]:
# import databricks.spark.xml
# import databricks.spark.xml.functions.from_xml
# from pyspark

In [None]:

from pyspark.sql.types import *

In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# from pyspark.sql.functions import *
# from decimal import Decimal
# appName = "Python Example - PySpark Read XML"
# master = "local"

# # Create Spark session
# spark_2 = SparkSession.builder \
#     .appName(appName) \
#     .master(master) \
#     .config("spark.jars.packages", "com.databricks:spark-xml_2.11:0.9.0") \
#     .getOrCreate()

In [None]:
customSchema = StructType([
    StructField("logDate", StringType(), True),
    StructField("device", StringType(), True),
    StructField("location", StringType(), True),
    StructField("os", StringType(), True),
    StructField("ipAddress", StringType(), True),
    StructField("phoneNumber", StringType(), True)])

In [23]:
sourceDf = spark.read.csv('log_reviews.csv', sep=',', header=True, inferSchema=True, multiLine=True)
sourceDf.show(5)

+---------+--------------------+
|id_review|                 log|
+---------+--------------------+
|        1|<reviewlog><log><...|
|        2|<reviewlog><log><...|
|        3|<reviewlog><log><...|
|        4|<reviewlog><log><...|
|        5|<reviewlog><log><...|
+---------+--------------------+
only showing top 5 rows



In [None]:
# xmlfile = sourceDf.select('log')
# xmlfile.show(5)

+--------------------+
|                 log|
+--------------------+
|<reviewlog><log><...|
|<reviewlog><log><...|
|<reviewlog><log><...|
|<reviewlog><log><...|
|<reviewlog><log><...|
+--------------------+
only showing top 5 rows



In [29]:
sourceDf = sourceDf.withColumn('log', regexp_replace(
    'log', '<reviewlog><log><logDate>', '')).withColumn('log', regexp_replace(
    'log', '</phoneNumber></log></reviewlog>', ''))

log_review_df_2 = sourceDf.withColumn('log', regexp_replace(
    'log', '\<(.*?)\>', ';')).withColumn('log', regexp_replace(
    'log', ';;', ';'))

log_review_df_2 = log_review_df_2.withColumn(
    'log_date', split(log_review_df_2['log'], 
                      ';').getItem(0))
log_review_df_2 = log_review_df_2.withColumn(
    'device', split(log_review_df_2['log'], 
                      ';').getItem(1))
log_review_df_2 = log_review_df_2.withColumn(
    'location', split(log_review_df_2['log'], 
                      ';').getItem(2))
log_review_df_2 = log_review_df_2.withColumn(
    'os', split(log_review_df_2['log'], 
                      ';').getItem(3))
log_review_df_2 = log_review_df_2.withColumn(
    'ip_address', split(log_review_df_2['log'], 
                      ';').getItem(4))
log_review_df_2 = log_review_df_2.withColumn(
    'phoneNumber', split(log_review_df_2['log'], 
                      ';').getItem(5))


log_review_df_2.show(5)

+---------+--------------------+----------+------+-------------+--------------+------------+------------+
|id_review|                 log|  log_date|device|     location|            os|  ip_address| phoneNumber|
+---------+--------------------+----------+------+-------------+--------------+------------+------------+
|        1|04-25-2021;Mobile...|04-25-2021|Mobile|       Kansas|     Apple iOS|9.200.232.57|821-540-5777|
|        2|03-13-2021;Tablet...|03-13-2021|Tablet|       Oregon|Google Android|9.200.232.57|819-102-1320|
|        3|09-30-2021;Tablet...|09-30-2021|Tablet|    Minnesota|     Apple iOS|9.200.232.57|989-156-0498|
|        4|05-24-2021;Tablet...|05-24-2021|Tablet|     Arkansas|   Apple MacOS|9.200.232.57|225-837-9935|
|        5|02-01-2021;Tablet...|02-01-2021|Tablet|New Hampshire|         Linux|9.200.232.57|243-842-4562|
+---------+--------------------+----------+------+-------------+--------------+------------+------------+
only showing top 5 rows



In [30]:
log_review_df_2 = log_review_df_2.drop('log')

In [31]:
log_review_df_2.show(5)

+---------+----------+------+-------------+--------------+------------+------------+
|id_review|  log_date|device|     location|            os|  ip_address| phoneNumber|
+---------+----------+------+-------------+--------------+------------+------------+
|        1|04-25-2021|Mobile|       Kansas|     Apple iOS|9.200.232.57|821-540-5777|
|        2|03-13-2021|Tablet|       Oregon|Google Android|9.200.232.57|819-102-1320|
|        3|09-30-2021|Tablet|    Minnesota|     Apple iOS|9.200.232.57|989-156-0498|
|        4|05-24-2021|Tablet|     Arkansas|   Apple MacOS|9.200.232.57|225-837-9935|
|        5|02-01-2021|Tablet|New Hampshire|         Linux|9.200.232.57|243-842-4562|
+---------+----------+------+-------------+--------------+------------+------------+
only showing top 5 rows



In [32]:
log_review_df_2.write.mode('overwrite').csv('clean_log_reviews.csv')

In [34]:
log_review_df_2.write.format('csv').save('mycsv.csv')

In [None]:
# xmlSchema = spark.read.format('xml').options(rowTag='log').load(log_review_df.select("log"))
df = (spark.read.format("com.databricks.spark.xml"
              ).option("rowTag","log"
              ).load(log_review_df.select("log"), 
                     schema=customSchema))

In [None]:
log_review_df = (spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv('log_reviews.csv'))
log_review_df.show(10)

+---------+--------------------+
|id_review|                 log|
+---------+--------------------+
|        1|<reviewlog><log><...|
|        2|<reviewlog><log><...|
|        3|<reviewlog><log><...|
|        4|<reviewlog><log><...|
|        5|<reviewlog><log><...|
|        6|<reviewlog><log><...|
|        7|<reviewlog><log><...|
|        8|<reviewlog><log><...|
|        9|<reviewlog><log><...|
|       10|<reviewlog><log><...|
+---------+--------------------+
only showing top 10 rows



In [None]:


#   df.withColumn("xmldata", from_xml('xmldata, xmlSchema))
#     .select("*", "xmldata.ab")
#     .selectExpr(df.columns.diff(Array("xmldata")) ++
#       Array("ab[0]._a as name", "ab[0]._b as id", "ab[1]._a as manager_name", "ab[1]._b as manager_id"): _*)
#     .show(false)

In [None]:
user_purchase_df = (spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv('user_purchase.csv'))

In [None]:
user_purchase_df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows

