In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
import datetime

In [2]:
# create Spark session
spark = SparkSession.builder.appName("HiveDatabases").enableHiveSupport().getOrCreate()


In [3]:
# show all the databases in Hive
df1 = spark.sql("SELECT * FROM commerce.customers")

In [4]:
df1

DataFrame[id: int, nickname: string]

In [5]:
df1.columns

['id', 'nickname']

In [6]:
# check the number of rows in the DataFrame
print("Number of rows: ", df1.count())

[Stage 0:>                                                          (0 + 1) / 1]

Number of rows:  678


                                                                                

In [7]:
# check the schema of the DataFrame
df1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nickname: string (nullable = true)



In [8]:
# check the first 10 rows of the DataFrame
df1.show(10)

+-----+-----------------+
|   id|         nickname|
+-----+-----------------+
|    1|            admin|
|    2|      steve_gates|
|    3|    arthur_holmes|
|    4|        james_pan|
|    5|  brenda_lindgren|
|    6|victoria_victoria|
|13874|      pheptb17411|
|13878|     nghiatd17411|
|13879|     thuanta17411|
|13885|     duongnt17411|
+-----+-----------------+
only showing top 10 rows



=================================================================================================================================================================

In [9]:
# show all the databases in Hive
df2 = spark.sql("SELECT * FROM commerce.products")

In [10]:
df2

DataFrame[id: int, name: string, unitprice: double]

In [11]:
df2.columns

['id', 'name', 'unitprice']

In [12]:
# check the number of rows in the DataFrame
print("Number of rows: ", df2.count())

Number of rows:  691


In [13]:
# check the schema of the DataFrame
df2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- unitprice: double (nullable = true)



In [14]:
# check the first 10 rows of the DataFrame
df2.show(10)

+---+--------------------+---------+
| id|                name|unitprice|
+---+--------------------+---------+
|  1|Build your own co...|   1200.0|
|  2|Digital Storm VAN...|   1259.0|
|  3|Lenovo IdeaCentre...|    500.0|
|  4|Apple MacBook Pro...|   1800.0|
|  5|Asus N551JK-XO076...|   1500.0|
|  6|Samsung Series 9 ...|   1590.0|
|  7|HP Spectre XT Pro...|   1350.0|
|  8|HP Envy 6-1180ca ...|   1460.0|
|  9|Lenovo Thinkpad X...|   1360.0|
| 10| Adobe Photoshop CS4|     75.0|
+---+--------------------+---------+
only showing top 10 rows



=================================================================================================================================================================

In [15]:
# show all the databases in Hive
df3 = spark.sql("SELECT * FROM commerce.ratings")

In [16]:
df3

DataFrame[createdate: string, customerid: int, productid: int, rate: int]

In [17]:
df3.columns

['createdate', 'customerid', 'productid', 'rate']

In [18]:
# check the number of rows in the DataFrame
print("Number of rows: ", df3.count())

Number of rows:  130754


In [19]:
# check the schema of the DataFrame
df3.printSchema()

root
 |-- createdate: string (nullable = true)
 |-- customerid: integer (nullable = true)
 |-- productid: integer (nullable = true)
 |-- rate: integer (nullable = true)



In [20]:
# check the first 10 rows of the DataFrame
df3.show(10)

+----------+----------+---------+----+
|createdate|customerid|productid|rate|
+----------+----------+---------+----+
|2017-12-31|    103416|      619|   1|
|2017-12-31|    103654|      411|   1|
|2017-12-31|    103954|      298|   3|
|2017-12-31|    103672|      361|   5|
|2017-12-31|    103960|      536|   5|
|2017-12-31|    103372|      481|   2|
|2017-12-31|    103444|      132|   1|
|2017-12-31|    103831|       41|   1|
|2017-12-31|    103541|      498|   5|
|2017-12-31|    103819|      155|   4|
+----------+----------+---------+----+
only showing top 10 rows



## 1. Cleaning: This task involves removing or correcting any invalid, incomplete, or irrelevant data. For example, you can drop rows with missing values, remove duplicate rows, or correct any incorrect data values.

In [21]:
# drop rows with missing values
df1 = df1.dropna()
df2 = df2.dropna()
df3 = df3.dropna()

# remove duplicate rows
df1 = df1.dropDuplicates()
df2 = df2.dropDuplicates()
df3 = df3.dropDuplicates()

# Correct any incorrect values in the "Rate" column
df3 = df3.filter((df3["Rate"] >= 1) & (df3["Rate"] <= 5))

## 2. Normalization: This task involves transforming the data into a consistent format. For example, you can standardize the date format, convert all string values to lowercase, or map categorical values to numerical values.

In [22]:
# Convert all the strings in the 'nickName' column to lowercase
df1 = df1.withColumn("nickname", lower(df1["nickname"]))

In [23]:
# Convert the "Name" column to lowercase
df2 = df2.withColumn("Name", lower(df2["Name"]))

In [24]:
# Map the "Rate" column values to numerical values (1 = poor, 2 = average, 3 = good, 4 = very good, 5 = excellent)
#map_udf = udf(lambda x: 1 if x == 1 else 2 if x == 2 else 3 if x == 3 else 4 if x == 4 else 5, IntegerType())
#df3 = df3.withColumn("Rate", map_udf(df3["Rate"]))

## 3. Feature Engineering: This task involves creating new features or variables from the existing data. For example, you can create new columns by extracting information from existing columns, create interaction variables between multiple columns, or create dummy variables for categorical variables.

In [25]:
#Create a new column "PriceRange" based on the value of the "UnitPrice" column
df2 = df2.withColumn("PriceRange", when(df2["UnitPrice"] < 500, "Low").when((df2["UnitPrice"] >= 500) & (df2["UnitPrice"] < 1200), "Medium").otherwise("High"))

In [26]:
def assign_category(product):
    if "necklace" in product or "ring" in product or "earrings" in product or "bracelets" in product or "gold" in product or "silver" in product or "tungsten carbide" in product or "diamonds" in product or "gemstones" in product or "amethyst" in product or "aquamarine" in product or "blue topaz" in product:
        return "Jewelry"
    elif "laptop" in product or "speakers" in product or "bamboo" in product or "beverage bag" in product or "thermos bottle" in product or "basic tee" in product or "Xiaomi" in product or "iPhone" in product:
        return "Electronics"
    elif "books" in product or "software programs" in product or "Windows" in product or "Digital downloads" in product:
        return "Digital downloads"
    elif "clothing" in product or "t-shirt" in product or "jeans" in product or"sneaker" in product or "shoes" in product or "jacket" in product or "joggers" in product or "vans" in product or "nike air force" in product:
        return "Apparel"
    elif "DỰ ÁN TMĐT" in product:
        return "DỰ ÁN TMĐT"
    elif "cell phone" in product or "smartwatch" in product or "smart home" in product or "smart lighting" in product or "smart lock" in product:
        return "Computers"
    elif "Gift Cards" in product:
        return "Gift Cards"
    else:
        return "Others"

udf_assign_category = udf(assign_category, StringType())
df2 = df2.withColumn("category", udf_assign_category("Name"))


In [27]:
# Create a new column for the year and month of the CreateDate
df3 = df3.withColumn("year", year(df3["CreateDate"]))
df3 = df3.withColumn("month", month(df3["CreateDate"]))

## 4. Data Transformation: This task involves transforming the data into a format that is suitable for further analysis or modeling. For example, you can pivot the data, aggregate the data, or reshape the data.

In [28]:
df1.toPandas()

Unnamed: 0,id,nickname
0,103560,anh.luongbao
1,103675,huonghue.che
2,103972,hong.dang.9235
3,103450,letranglotus
4,103324,phuc.trinh.313
...,...,...
673,103596,thanhcuong.tc
674,103632,thao.bui.50
675,103696,bryan846
676,103913,minh.nguyenthianh.7


In [29]:
# Pivot the data on the "Name" column, creating a new column for each unique value in the "Name" column
pivot_df2 = df2.groupBy("Id").pivot("Name").agg(first("UnitPrice"))

In [30]:
# Group the data by "ProductID" and calculate the average "Rate"
df3_agg = df3.groupBy("ProductID").agg(F.avg("Rate").alias("AvgRate"))

# Pivot the data to get the "AvgRate" for each "ProductID" in a separate column
df3_pivot = df3_agg.groupBy().pivot("ProductID").agg(F.first("AvgRate"))

In [31]:
df3 = df3.drop('CreateDate')


## 5. Data Validation: This task involves validating the data to ensure that it meets the required quality standards. For example, you can check for missing values, outliers, or inconsistent data values.

In [32]:
df1.filter(isnull(df1["NickName"])).show()

+---+--------+
| id|nickname|
+---+--------+
+---+--------+



In [33]:
df1.groupBy("NickName").count().filter("count > 1").show()

[Stage 30:>                                                         (0 + 1) / 1]

+--------+-----+
|NickName|count|
+--------+-----+
+--------+-----+



                                                                                

In [34]:
# Remove outliers based on unit price being greater than 2000
df2 = df2.filter(col("UnitPrice") < 2000)

df2 = df2.withColumn("is_valid", when(col("UnitPrice") < 0, False).otherwise(True))


In [35]:
# count the number of missing values in each column
missing_values = df3.select([count(when(isnull(c), c)).alias(c) for c in df3.columns]).collect()
print(missing_values)




[Row(customerid=0, productid=0, rate=0, year=0, month=0)]


                                                                                

In [36]:
# get the summary statistics of the data
summary = df3.describe()
print(summary.show())

[Stage 44:>                                                         (0 + 2) / 2]

+-------+-----------------+-----------------+------------------+------------------+------------------+
|summary|       customerid|        productid|              rate|              year|             month|
+-------+-----------------+-----------------+------------------+------------------+------------------+
|  count|           130748|           130748|            130748|            130748|            130748|
|   mean|99816.64361978769|345.4398231712913|3.1885535534004346|2019.6103190870988| 6.322131122464588|
| stddev|  18029.136244901|199.3425933833639|1.5158247032388799|1.2061731658091754|3.5020728905885448|
|    min|                3|                1|                 1|              2017|                 1|
|    max|           103997|              691|                 5|              2022|                12|
+-------+-----------------+-----------------+------------------+------------------+------------------+

None


                                                                                

In [37]:
# check for rate values that are outside the expected range
df3 = df3.filter((df3.rate >= 1) & (df3.rate <= 5))

In [38]:
# Create Hive Internal table
df1.write.mode('overwrite') \
         .saveAsTable("commerce.cleaned_customers")
df2.write.mode('overwrite') \
         .saveAsTable("commerce.cleaned_products")
df3.write.mode('overwrite') \
         .saveAsTable("commerce.cleaned_ratings")

23/02/21 00:27:29 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                                                                

In [39]:
spark.stop()