In [1]:
# lib imports
from pyspark.sql import SparkSession
import pandas as pd
import datetime

In [2]:
# load customer dataset
customer_df = spark.read.csv(
    "data/spent_customer.csv", header=True, mode="DROPMALFORMED"
)
print('No. customers: ', customer_df.count())
print('customer_df columns: ', customer_df.columns)
customer_df.show(5)

No. customers:  35180
customer_df columns:  ['customer_number']
+---------------+
|customer_number|
+---------------+
|          10000|
|          10001|
|          10002|
|          10003|
|          10008|
+---------------+
only showing top 5 rows



In [3]:
# load product-type dataset
type_df = spark.read.csv(
    "data/product_type.csv", header=True, mode="DROPMALFORMED"
)
print('No. prod type: ', type_df.count())
print('No. columns: ', type_df.columns)
type_df.show(5)

No. prod type:  65
No. columns:  ['product_type_code', 'product_type', 'product_category']
+-----------------+---------------+----------------+
|product_type_code|   product_type|product_category|
+-----------------+---------------+----------------+
|               AA|Classical Music|           Music|
|               AB|    Dance Music|           Music|
|               AC|     Jazz Music|           Music|
|               AD|    Blues Music|           Music|
|               AE|     Rock Music|           Music|
+-----------------+---------------+----------------+
only showing top 5 rows



In [4]:
# load customer_statistics dataset
stat_df = spark.read.csv(
    "data/customer_statistics.csv", header=True, mode="DROPMALFORMED"
)
stat_df = stat_df.select('customer_number', 'product_type_code', 'total_qty')
print('No. prod type: ', stat_df.count())
print('No. columns: ', stat_df.columns)
stat_df.show(5)

No. prod type:  3520270
No. columns:  ['customer_number', 'product_type_code', 'total_qty']
+---------------+-----------------+---------+
|customer_number|product_type_code|total_qty|
+---------------+-----------------+---------+
|          10000|               AA|        0|
|          10000|               AB|        0|
|          10000|               AC|        0|
|          10000|               AD|        0|
|          10000|               AE|        0|
+---------------+-----------------+---------+
only showing top 5 rows



In [5]:
# check customer no. in stat_df
stat_df.count()/type_df.count()

54158.0

In [6]:
# convert type_df to list
type_list = type_df.select('product_type_code').rdd.flatMap(lambda x: x).collect()
type_list.sort()
len(type_list)

65

In [7]:
# convert customer_df to list
customers_list = customer_df.select('customer_number').distinct().rdd.map(lambda r: r[0]).collect()
customers_list.sort()
print(customers_list)
len(customers_list)

['10000', '10001', '10002', '10003', '10008', '10009', '10010', '10012', '10013', '10017', '10018', '10019', '10020', '10024', '10025', '10026', '10027', '10028', '10037', '10038', '10041', '10042', '10044', '10051', '10055', '10056', '10058', '10060', '10067', '10068', '10072', '10073', '10078', '10079', '10080', '10086', '10087', '10089', '10091', '10092', '10093', '10095', '10098', '10100', '10101', '10104', '10106', '10108', '10110', '10112', '10114', '10115', '10117', '10118', '10121', '10123', '10124', '10125', '10126', '10130', '10134', '10135', '10140', '10141', '10143', '10149', '10151', '10155', '10157', '10159', '10160', '10161', '10163', '10169', '10172', '10178', '10179', '10184', '10187', '10193', '10195', '10197', '10199', '10205', '10206', '10212', '10213', '10214', '10217', '10219', '10220', '10224', '10226', '10227', '10229', '10230', '10231', '10232', '10236', '10237', '10238', '10239', '10241', '10242', '10243', '10248', '10250', '10251', '10252', '10258', '10260', 

35180

In [8]:
# create headers for result df
result_headers = ['customer_number']
result_headers.extend(type_list)
print(result_headers)
len(result_headers)

['customer_number', 'AA', 'AB', 'AC', 'AD', 'AE', 'AF', 'AG', 'AH', 'AI', 'AJ', 'AK', 'AL', 'AM', 'AN', 'AO', 'AP', 'AQ', 'AR', 'AS', 'AT', 'AU', 'AV', 'AW', 'AX', 'AY', 'AZ', 'BA', 'BB', 'BC', 'BD', 'BE', 'BF', 'BG', 'BH', 'BI', 'BJ', 'BK', 'BL', 'BM', 'BN', 'BO', 'BP', 'BQ', 'BR', 'BS', 'BT', 'BU', 'BV', 'BW', 'BX', 'BY', 'BZ', 'CA', 'CC', 'CD', 'CE', 'CF', 'CG', 'CH', 'CI', 'CJ', 'CK', 'CL', 'CM', 'CN']


66

In [9]:
#start mapping process
start_time = datetime.datetime.now()

In [10]:
result = []
len_cus = len(customers_list)
i = 1
for customer in customers_list:
    row = [customer]
    qty = stat_df.select('total_qty').where('customer_number = {}'.format(customer)).rdd.map(lambda r: r[0]).collect()
    row.extend(qty)
    
#     qty = stat_df.select('product_type_code', 'total_qty').where('customer_number = {}'.format(customer)).collect()
#     for product_type in type_list:
#         value = [x.total_qty for x in qty if x.product_type_code == product_type]
#         if (len(value) > 0):
#             value = int(value[0])
#         else:
#             value = 0
#         row.append(value)
    result.append(row)
    if ((i % 1000 == 0) or (i == len_cus)):
        print ('{}/{} ----------- {} ----------- {} passed'.format(i, len_cus, row[0], datetime.datetime.now() - start_time))
    i += 1
#     print(row)

1000/35180 ----------- 12501 ----------- 0:44:52.288330 passed
2000/35180 ----------- 14978 ----------- 1:28:48.412595 passed
3000/35180 ----------- 17725 ----------- 2:12:46.733958 passed
4000/35180 ----------- 20471 ----------- 2:59:36.896570 passed
5000/35180 ----------- 22970 ----------- 3:46:50.591736 passed
6000/35180 ----------- 25621 ----------- 4:34:08.513472 passed
7000/35180 ----------- 28174 ----------- 5:21:50.389244 passed
8000/35180 ----------- 30825 ----------- 6:11:19.677638 passed
9000/35180 ----------- 33377 ----------- 7:01:31.566899 passed
10000/35180 ----------- 35870 ----------- 7:52:22.177691 passed
11000/35180 ----------- 38470 ----------- 8:43:27.082546 passed
12000/35180 ----------- 41020 ----------- 9:35:04.466940 passed
13000/35180 ----------- 43536 ----------- 10:28:40.628414 passed
14000/35180 ----------- 46090 ----------- 11:22:58.815747 passed
15000/35180 ----------- 48646 ----------- 12:17:31.622092 passed
16000/35180 ----------- 51099 ----------- 13:1

In [11]:
# test result
print(len(result))
print(len(result[0]))

35180
66


In [12]:
# create df
pdf = pd.DataFrame.from_records(result, columns = result_headers)
result_df = spark.createDataFrame(pdf)
result_df.show()

+---------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|customer_number| AA| AB| AC| AD| AE| AF| AG| AH| AI| AJ| AK| AL| AM| AN| AO| AP| AQ| AR| AS| AT| AU| AV| AW| AX| AY| AZ| BA| BB| BC| BD| BE| BF| BG| BH| BI| BJ| BK| BL| BM| BN| BO| BP| BQ| BR| BS| BT| BU| BV| BW| BX| BY| BZ| CA| CC| CD| CE| CF| CG| CH| CI| CJ| CK| CL| CM| CN|
+---------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|          10000|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 

In [13]:
# write result to csv
result_df.coalesce(1).write.option('header', 'true').csv('data/cus-prod-type-totalqty')

In [14]:
# done job
end_time = datetime.datetime.now()

# mapping time
print('Mapping process spent: \n',end_time - start_time)
print('Process end at: \n', end_time)

Mapping process spent: 
 1 day, 7:28:00.541851
Process end at: 
 2019-05-30 10:56:04.628325
