# Đồ án giữa kỳ
# Môn: Xử lý dữ liệu lớn
# Học kỳ 1 - Năm học 2022-2023
# Giảng viên: Th.S Nguyễn Thành An

# Cài đặt PySpark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

# Yêu cầu

## Spark Context

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext("local","mid-term")
sqlc = SQLContext(sc)

## Đọc dữ liệu data.csv

In [None]:
data = sc.textFile('/content/data.csv')

## Câu 1: Đếm món hàng

In [None]:
'''
Câu 1
'''

def preprocessLine(x):
  items = x.strip().split(',')[0:2]
  return (str(items[0]+','+items[1]),1)

def f(x):
  s = x[0].split(',')
  return s[0]+','+s[1]+','+str(x[1])

data_F = data.map(preprocessLine).reduceByKey(lambda x, y: x + y).map(f)
data_F.saveAsTextFile('/content/counters')

In [None]:
import os

if os.path.exists('/content/counters/part-00000'):
  sqlc.read.csv('/content/counters/part-00000', header=True).show()

+-------------+----------+---+
|Member_number|      Date|  1|
+-------------+----------+---+
|         1249|01/01/2014|  2|
|         1381|01/01/2014|  2|
|         1440|01/01/2014|  2|
|         1659|01/01/2014|  2|
|         1789|01/01/2014|  2|
|         1922|01/01/2014|  2|
|         2226|01/01/2014|  2|
|         2237|01/01/2014|  2|
|         2351|01/01/2014|  2|
|         2542|01/01/2014|  2|
|         2610|01/01/2014|  3|
|         2709|01/01/2014|  2|
|         2727|01/01/2014|  2|
|         2943|01/01/2014|  2|
|         2974|01/01/2014|  3|
|         3681|01/01/2014|  3|
|         3797|01/01/2014|  2|
|         3942|01/01/2014|  3|
|         3956|01/01/2014|  4|
|         4260|01/01/2014|  2|
+-------------+----------+---+
only showing top 20 rows



## Câu 2: Giỏ hàng

In [None]:
'''
Câu 2
'''
def get_key_value(x):
  items = x.strip().split(',')[0:3]
  return (str(items[0]+';'+items[1]+';'),items[2])

def f2(x):
  s = x[0].split(';')
  return s[0]+';'+s[1]+';'+str(x[1])

data_2 = data.map(get_key_value).reduceByKey(lambda x, y:x+','+y).map(f2)
data_2.saveAsTextFile('/content/baskets')

In [None]:
import os

if os.path.exists('/content/baskets/part-00000'):
  sqlc.read.csv('/content/baskets/part-00000', header=True, sep=';').show()

+-------------+----------+--------------------+
|Member_number|      Date|     itemDescription|
+-------------+----------+--------------------+
|         1249|01/01/2014| citrus fruit,coffee|
|         1381|01/01/2014|           curd,soda|
|         1440|01/01/2014|other vegetables,...|
|         1659|01/01/2014|specialty chocola...|
|         1789|01/01/2014|hamburger meat,ca...|
|         1922|01/01/2014|tropical fruit,ot...|
|         2226|01/01/2014|sausage,bottled w...|
|         2237|01/01/2014|bottled water,Ins...|
|         2351|01/01/2014|cleaner,shopping ...|
|         2542|01/01/2014|sliced cheese,bot...|
|         2610|01/01/2014|hamburger meat,bo...|
|         2709|01/01/2014|yogurt,frozen veg...|
|         2727|01/01/2014|hamburger meat,fr...|
|         2943|01/01/2014|whole milk,flower...|
|         2974|01/01/2014|berries,whipped/s...|
|         3681|01/01/2014|onions,whipped/so...|
|         3797|01/01/2014|  waffles,whole milk|
|         3942|01/01/2014|other vegetabl

## Câu 3: Tập phổ biến

In [None]:
def convertToItemsList(x):
  get = x.strip().split(';')
  id = get[0]
  date = get[1]
  items = list(set(get[2].split(',')))
  return (id ,date, items)

tmp = [line.strip() for line in open('/content/baskets/part-00000', 'r')][1:]
tmp = sc.parallelize(tmp)
tmp = tmp.map(convertToItemsList)
dfBaskets = sqlc.createDataFrame(tmp,["Member_number","Date","Items"])
dfBaskets.show()

from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="Items", 
                    minSupport=0.01, 
                    minConfidence=0.1)
model = fpGrowth.fit(dfBaskets)

model.freqItemsets.show()
model.associationRules.show()

+-------------+----------+--------------------+
|Member_number|      Date|               Items|
+-------------+----------+--------------------+
|         1249|01/01/2014|[coffee, citrus f...|
|         1381|01/01/2014|        [soda, curd]|
|         1440|01/01/2014|[yogurt, other ve...|
|         1659|01/01/2014|[specialty chocol...|
|         1789|01/01/2014|[hamburger meat, ...|
|         1922|01/01/2014|[other vegetables...|
|         2226|01/01/2014|[sausage, bottled...|
|         2237|01/01/2014|[bottled water, I...|
|         2351|01/01/2014|[shopping bags, c...|
|         2542|01/01/2014|[bottled water, s...|
|         2610|01/01/2014|[hamburger meat, ...|
|         2709|01/01/2014|[yogurt, frozen v...|
|         2727|01/01/2014|[hamburger meat, ...|
|         2943|01/01/2014|[whole milk, flow...|
|         2974|01/01/2014|[bottled water, w...|
|         3681|01/01/2014|[onions, whipped/...|
|         3797|01/01/2014|[whole milk, waff...|
|         3942|01/01/2014|[yogurt, other

## Câu 4: Giỏ hàng thành vectors 

In [None]:
from pyspark.ml.linalg import Vectors

tmp = sqlc.read.option("delimiter", ",")\
                     .option("header", "true")\
                     .csv('/content/data.csv')

dfMembers = tmp.select(['Member_number','itemDescription'])\
               .rdd\
               .reduceByKey(lambda x, y: x + "," + y)\
               .toDF()\
               .selectExpr("_1 as Member_number", "_2 as Items")

dfMembers.show()
items = tmp.select('itemDescription')\
           .rdd\
           .distinct()\
           .flatMap(lambda x: x)\
           .collect()

items = sorted(items)

dictItems = {v:i for i,v in enumerate(items)}
print(items)
print(dictItems)

def basket2vector(member, basket, dictItems):
  index = []
  value = []
  for k,v in dictItems.items():
    if k in basket:
      index.append(v)
      value.append(1.0)
  return Vectors.sparse(len(dictItems),index,value)

print(basket2vector(dfMembers.first()['Member_number'],
                    dfMembers.first()['Items'],
                    dictItems))

+-------------+--------------------+
|Member_number|               Items|
+-------------+--------------------+
|         1249|citrus fruit,coff...|
|         1381|curd,soda,coffee,...|
|         1440|other vegetables,...|
|         1659|specialty chocola...|
|         1789|hamburger meat,ca...|
|         1922|tropical fruit,ot...|
|         2226|sausage,bottled w...|
|         2237|bottled water,Ins...|
|         2351|cleaner,shopping ...|
|         2542|sliced cheese,bot...|
|         2610|hamburger meat,bo...|
|         2709|yogurt,frozen veg...|
|         2727|hamburger meat,fr...|
|         2943|whole milk,flower...|
|         2974|berries,whipped/s...|
|         3681|onions,whipped/so...|
|         3797|waffles,whole mil...|
|         3942|other vegetables,...|
|         3956|yogurt,shopping b...|
|         4260|soda,brown bread,...|
+-------------+--------------------+
only showing top 20 rows

['Instant food products', 'UHT-milk', 'abrasive cleaner', 'artif. sweetener', 'baby co

## Câu 5: Giỏ hàng tương tự

In [None]:
from pyspark.ml.feature import MinHashLSH
from pyspark.sql.functions import col,monotonically_increasing_id

'''preprocess'''

new_dfMembers = dfMembers.rdd\
                         .map(lambda x: (x[0],basket2vector(x[0],x[1],dictItems)))\
                         .toDF()\
                         .select(col('_1').alias('Member_number'), col('_2').alias('Items'))

mh_lsh = MinHashLSH(inputCol="Items", outputCol="Hashes", numHashTables=5)
model = mh_lsh.fit(new_dfMembers)

model.transform(new_dfMembers).show()

model.approxSimilarityJoin(new_dfMembers, new_dfMembers, 0.3, distCol="JaccardDistance")\
    .select(col("datasetA.Member_number").alias("idA"),
            col("datasetB.Member_number").alias("idB"),
            col("JaccardDistance")).filter(col("JaccardDistance") > 0).show()

+-------------+--------------------+--------------------+
|Member_number|               Items|              Hashes|
+-------------+--------------------+--------------------+
|         1249|(167,[11,30,34,61...|[[6.18856876E8], ...|
|         1381|(167,[1,10,11,28,...|[[3.6840299E8], [...|
|         1440|(167,[5,28,64,102...|[[1.8350541E8], [...|
|         1659|(167,[12,14,26,28...|[[6.4164136E7], [...|
|         1789|(167,[8,18,30,44,...|[[3.60000264E8], ...|
|         1922|(167,[10,12,15,16...|[[2.94443958E8], ...|
|         2226|(167,[9,12,23,40,...|[[1.09546378E8], ...|
|         2237|(167,[0,12,27,34,...|[[2.718462E7], [8...|
|         2351|(167,[5,31,34,49,...|[[1.8350541E8], [...|
|         2542|(167,[12,82,88,94...|[[3.1385983E7], [...|
|         2610|(167,[5,11,49,67,...|[[1.8350541E8], [...|
|         2709|(167,[12,30,40,44...|[[5.8187736E8], [...|
|         2727|(167,[11,38,52,56...|[[5.49099207E8], ...|
|         2943|(167,[20,28,29,33...|[[2.86041232E8], ...|
|         2974

In [None]:
key = new_dfMembers.collect()[0]['Items']
model.approxNearestNeighbors(new_dfMembers, key, 5).show()

+-------------+--------------------+--------------------+------------------+
|Member_number|               Items|              Hashes|           distCol|
+-------------+--------------------+--------------------+------------------+
|         1249|(167,[11,30,34,61...|[[6.18856876E8], ...|               0.0|
|         1321|(167,[11,30,138],...|[[6.18856876E8], ...|               0.4|
|         1263|(167,[11,30,61,10...|[[6.18856876E8], ...|               0.5|
|         1794|(167,[11,30,138,1...|[[6.18856876E8], ...|0.5714285714285714|
|         2708|(167,[3,34,61,75,...|[[2.90242595E8], ...|0.5714285714285714|
+-------------+--------------------+--------------------+------------------+



## Câu 6: Phân cụm người dùng theo giỏ hàng

In [None]:
from pyspark.ml.clustering import KMeans, KMeansModel
kmeans = KMeans(k=5)
kmeans.setSeed(1)
#maxIter = 20
new_dfMembers_km = new_dfMembers.select(col('Member_number').alias('label'), col('Items').alias('features'))
model = kmeans.fit(new_dfMembers_km)
model.setPredictionCol('prediction')
model.transform(new_dfMembers_km).show()


+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
| 1249|(167,[11,30,34,61...|         1|
| 1381|(167,[1,10,11,28,...|         3|
| 1440|(167,[5,28,64,102...|         2|
| 1659|(167,[12,14,26,28...|         0|
| 1789|(167,[8,18,30,44,...|         0|
| 1922|(167,[10,12,15,16...|         4|
| 2226|(167,[9,12,23,40,...|         0|
| 2237|(167,[0,12,27,34,...|         1|
| 2351|(167,[5,31,34,49,...|         2|
| 2542|(167,[12,82,88,94...|         0|
| 2610|(167,[5,11,49,67,...|         2|
| 2709|(167,[12,30,40,44...|         0|
| 2727|(167,[11,38,52,56...|         4|
| 2943|(167,[20,28,29,33...|         0|
| 2974|(167,[9,12,38,63,...|         4|
| 3681|(167,[38,47,63,88...|         4|
| 3797|(167,[15,38,64,10...|         4|
| 3942|(167,[0,5,21,31,3...|         2|
| 3956|(167,[0,5,15,16,2...|         2|
| 4260|(167,[14,20,27,38...|         4|
+-----+--------------------+----------+
only showing top 20 rows

