
# **Running Pyspark in Colab**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

# see: https://stackoverflow.com/questions/55240940/error-while-installing-spark-on-google-colab

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Download data from drive

In [4]:
import zipfile # Библиотека для работы с zip архивами
import os      # Библиотека для работы с фаловой системой 
import time    # Библиотека для работы со временем

from google.colab import drive # Модуль для работы с Google Disk
# Подключаем гугл диск
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Прописываем путь к файлу с архивом
zip_file = '/content/drive/My Drive/bee/datasets.zip'  

# Распаковываем архив
z = zipfile.ZipFile(zip_file, 'r')
z.extractall()

# Просмотр результата разархивации
print(os.listdir())

['.config', 'drive', 'spark-3.0.0-bin-hadoop3.2', 'mobile_client.json', 'spark-3.0.0-bin-hadoop3.2.tgz', 'aerospike.tgz', 'parent_operator.csv', 'aerospike-server-community-5.7.0.8-ubuntu18.04', 'agg_usage.parquet', 'web_client.json', 'spark-3.0.0-bin-hadoop3.2.tgz.1', 'sample_data']


### Read spravochik

In [6]:
operators = spark.read.csv("parent_operator.csv", header=True, inferSchema=True)

In [7]:
#operators.show(5, truncate=False)

+--------------------+--------------------+-------------------+---------------+-----------------------+------------------+
|parent_operator_name|parent_operator_code|group_operator_code|land_mobile_ind|as_default_operator_ind|ofa_operator_group|
+--------------------+--------------------+-------------------+---------------+-----------------------+------------------+
|operator_198        |198                 |null               |1.0            |null                   |null              |
|operator_199        |199                 |null               |1.0            |null                   |null              |
|operator_200        |200                 |null               |1.0            |null                   |null              |
|operator_202        |202                 |null               |1.0            |null                   |null              |
|operator_876        |876                 |null               |null           |null                   |null              |
+---------------

In [8]:
#operators.printSchema()

root
 |-- parent_operator_name: string (nullable = true)
 |-- parent_operator_code: string (nullable = true)
 |-- group_operator_code: string (nullable = true)
 |-- land_mobile_ind: double (nullable = true)
 |-- as_default_operator_ind: string (nullable = true)
 |-- ofa_operator_group: string (nullable = true)



### Read parquet

In [9]:
dataset = spark.read.parquet("agg_usage.parquet")

In [10]:
#dataset.show(5, truncate=False)

+-------------------+------------------+-------------+--------------+-------------------+-----------------+------------+-----------+----------------------+----------------+--------------------+---------+---------+
|actual_call_dur_sec|call_direction_ind|call_type_key|charge_amt_rur|connection_type_key|location_type_key|network_type|num_of_call|parent_operator_code_b|roaming_type_key|rounded_data_volume |time_key |client_id|
+-------------------+------------------+-------------+--------------+-------------------+-----------------+------------+-----------+----------------------+----------------+--------------------+---------+---------+
|115.0              |1                 |V            |0.0           |3                  |2                |G           |1.0        |136                   |H               |1.817427842689825   |P20171025|0728799  |
|122.0              |2                 |V            |0.0           |3                  |2                |G           |1.0        |130         

In [11]:
#dataset.printSchema()

root
 |-- actual_call_dur_sec: double (nullable = true)
 |-- call_direction_ind: string (nullable = true)
 |-- call_type_key: string (nullable = true)
 |-- charge_amt_rur: double (nullable = true)
 |-- connection_type_key: string (nullable = true)
 |-- location_type_key: string (nullable = true)
 |-- network_type: string (nullable = true)
 |-- num_of_call: double (nullable = true)
 |-- parent_operator_code_b: string (nullable = true)
 |-- roaming_type_key: string (nullable = true)
 |-- rounded_data_volume: double (nullable = true)
 |-- time_key: string (nullable = true)
 |-- client_id: string (nullable = true)



### Join parquet and csv

In [12]:
dataset = dataset.join(operators,
                       dataset.parent_operator_code_b == operators.parent_operator_code,
                       how = 'left_outer')

In [13]:
#dataset.show(5, truncate=False)

+-------------------+------------------+-------------+--------------+-------------------+-----------------+------------+-----------+----------------------+----------------+--------------------+---------+---------+--------------------+--------------------+-------------------+---------------+-----------------------+------------------+
|actual_call_dur_sec|call_direction_ind|call_type_key|charge_amt_rur|connection_type_key|location_type_key|network_type|num_of_call|parent_operator_code_b|roaming_type_key|rounded_data_volume |time_key |client_id|parent_operator_name|parent_operator_code|group_operator_code|land_mobile_ind|as_default_operator_ind|ofa_operator_group|
+-------------------+------------------+-------------+--------------+-------------------+-----------------+------------+-----------+----------------------+----------------+--------------------+---------+---------+--------------------+--------------------+-------------------+---------------+-----------------------+---------------

In [14]:
#dataset.printSchema()

root
 |-- actual_call_dur_sec: double (nullable = true)
 |-- call_direction_ind: string (nullable = true)
 |-- call_type_key: string (nullable = true)
 |-- charge_amt_rur: double (nullable = true)
 |-- connection_type_key: string (nullable = true)
 |-- location_type_key: string (nullable = true)
 |-- network_type: string (nullable = true)
 |-- num_of_call: double (nullable = true)
 |-- parent_operator_code_b: string (nullable = true)
 |-- roaming_type_key: string (nullable = true)
 |-- rounded_data_volume: double (nullable = true)
 |-- time_key: string (nullable = true)
 |-- client_id: string (nullable = true)
 |-- parent_operator_name: string (nullable = true)
 |-- parent_operator_code: string (nullable = true)
 |-- group_operator_code: string (nullable = true)
 |-- land_mobile_ind: double (nullable = true)
 |-- as_default_operator_ind: string (nullable = true)
 |-- ofa_operator_group: string (nullable = true)



### Create dataset data_traffic

In [15]:
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

In [16]:
# rounded_data_volume agg-s

names_volume = ["data_4g_mb", "data_all_mb", "data_3g_mb"]
all_conds = []

data_4g_mb = (F.col("call_type_key").isin(['G', 'Y', 'X'])) &\
(F.col("roaming_type_key").isin(['X', 'R', 'H'])) &\
(F.col("network_type") == 'L')
all_conds.append(data_4g_mb)

data_all_mb = (F.col("call_type_key").isin(['G', 'Y', 'X'])) &\
(F.col("roaming_type_key").isin(['X', 'R', 'H']))
all_conds.append(data_all_mb)

data_3g_mb = (F.col("call_type_key").isin(['G', 'Y', 'X'])) &\
(F.col("roaming_type_key").isin(['X', 'R', 'H'])) &\
(F.col("network_type") == 'G')
all_conds.append(data_3g_mb)

# convert from bytes to mbytes
exprs_data_volume = [F.sum(F.when(all_conds[x], F.col("rounded_data_volume")/1048576)).cast("double").alias(names_volume[x]) \
         for x in range(len(all_conds))]

In [17]:
data_traffic = dataset.groupby("client_id", "time_key")\
.agg(*exprs_data_volume)\
.select(["client_id", "time_key"] + names_volume)

time_expr = F.to_timestamp(F.col("time_key").substr(2, 8), 'yyyyMMdd')
data_traffic = data_traffic.withColumn("time_key",
                                       time_expr.substr(1, 10))

#data_traffic.show(10, truncate=False)

+---------+----------+------------------+------------------+------------------+
|client_id|time_key  |data_4g_mb        |data_all_mb       |data_3g_mb        |
+---------+----------+------------------+------------------+------------------+
|0730208  |2017-10-25|108.13695664004459|108.13695664004459|null              |
|0725527  |2017-10-21|948.3228443579095 |985.8001942858766 |37.47734992796713 |
|0734188  |2017-10-20|2353.989691438321 |2867.961480048383 |513.9717886100623 |
|0733202  |2017-10-21|495.96789404341615|538.040283702559  |42.07238965914287 |
|0733561  |2017-10-24|null              |null              |null              |
|0722239  |2017-10-25|null              |960.5745088091671 |960.5745088091671 |
|0731839  |2017-10-26|6496.493439448048 |6496.493439448048 |null              |
|0732443  |2017-10-20|267.8274548295143 |335.4308010483185 |67.60334621880413 |
|0719068  |2017-10-25|185.8319969150346 |186.74071083637952|0.9087139213449125|
|0728085  |2017-10-21|null              

### Create dataset voice_traffic

In [18]:
# num_of_call agg-s

names_call = ["voice_in_cnt", "voice_onnet_in_cnt", 
              "voice_intercity_in_cnt", "voice_onnet_out_cnt", 
              "voice_out_cnt", "voice_intercity_out_cnt",
              "voice_intenational_in_cnt"]
all_conds = []

voice_in_cnt = (F.col("call_direction_ind") == 1)
all_conds.append(voice_in_cnt)

voice_onnet_in_cnt = (F.col("call_type_key") == 'V') &\
(F.col("call_direction_ind") == 1) &\
(F.col("connection_type_key") == 3) &\
(F.col("group_operator_code") == 'BLN')
all_conds.append(voice_onnet_in_cnt)

voice_intercity_in_cnt = (F.col("location_type_key") == 1) &\
(F.col("call_direction_ind") == 1)
all_conds.append(voice_intercity_in_cnt)

voice_onnet_out_cnt = (F.col("call_type_key") == 'V') &\
(F.col("call_direction_ind") == 2) &\
(F.col("connection_type_key") == 3) &\
(F.col("group_operator_code") == 'BLN')
all_conds.append(voice_onnet_out_cnt)

voice_out_cnt = (F.col("call_direction_ind") == 2)
all_conds.append(voice_out_cnt)

voice_intercity_out_cnt = (F.col("location_type_key") == 1) &\
(F.col("call_direction_ind") == 2)
all_conds.append(voice_intercity_out_cnt)

# здесь была ошибка в описании в ТЗ
voice_intenational_in_cnt = (F.col("location_type_key") == 3) &\
(F.col("call_direction_ind") == 1)
all_conds.append(voice_intenational_in_cnt)

exprs_num_of_calls = [F.sum(F.when(all_conds[x], F.col("num_of_call"))).cast("double").alias(names_call[x]) \
         for x in range(len(all_conds))]

In [19]:
# actual_call_dur_sec agg-s

names_sec = ["voice_international_in_sec", "voice_intercity_out_sec", 
             "voice_out_sec", "voice_out_nopackage_sec", 
             "voice_international_out_cnt", "voice_onnet_out_sec",
             "voice_in_sec", "voice_onnet_in_sec",
             "voice_intercity_in_sec", "voice_international_out_sec"]
all_conds = []

voice_international_in_sec = (F.col("location_type_key") == 3) &\
(F.col("call_direction_ind") == 1)
all_conds.append(voice_international_in_sec)

voice_intercity_out_sec = (F.col("location_type_key") == 1) &\
(F.col("call_direction_ind") == 2)
all_conds.append(voice_intercity_out_sec)

voice_out_sec = (F.col("call_direction_ind") == 2)
all_conds.append(voice_out_sec)

voice_out_nopackage_sec = (F.col("call_direction_ind") == 2) &\
(F.col("charge_amt_rur") > 0)
all_conds.append(voice_out_nopackage_sec)

voice_international_out_cnt = (F.col("location_type_key") == 3) &\
(F.col("call_direction_ind") == 2)
all_conds.append(voice_international_out_cnt)

voice_onnet_out_sec = (F.col("call_type_key") == 'V') &\
(F.col("call_direction_ind") == 2) &\
(F.col("connection_type_key") == 3) &\
(F.col("group_operator_code") == 'BLN')
all_conds.append(voice_onnet_out_sec)

voice_in_sec = (F.col("call_direction_ind") == 1)
all_conds.append(voice_in_sec)

voice_onnet_in_sec = (F.col("call_type_key") == 'V') &\
(F.col("call_direction_ind") == 1) &\
(F.col("connection_type_key") == 3) &\
(F.col("group_operator_code") == 'BLN')
all_conds.append(voice_onnet_in_sec)

voice_intercity_in_sec = (F.col("location_type_key") == 1) &\
(F.col("call_direction_ind") == 1)
all_conds.append(voice_intercity_in_sec)

voice_international_out_sec = (F.col("location_type_key") == 3) &\
(F.col("call_direction_ind") == 2)
all_conds.append(voice_international_out_sec)

exprs_actual_call_dur_sec = [F.sum(F.when(all_conds[x], F.col("actual_call_dur_sec"))).cast("double").alias(names_sec[x]) \
         for x in range(len(all_conds))]

In [20]:
voice_traffic = dataset.groupby("client_id", "time_key")\
.agg(*(exprs_num_of_calls + exprs_actual_call_dur_sec))\
.select(["client_id", "time_key"] + names_call + names_sec)

time_expr = F.to_timestamp(F.col("time_key").substr(2, 8), 'yyyyMMdd')
voice_traffic = voice_traffic.withColumn("time_key",
                                         time_expr.substr(1, 10))

#voice_traffic.show(10, truncate=False)

+---------+----------+------------+------------------+----------------------+-------------------+-------------+-----------------------+-------------------------+--------------------------+-----------------------+-------------+-----------------------+---------------------------+-------------------+------------+------------------+----------------------+---------------------------+
|client_id|time_key  |voice_in_cnt|voice_onnet_in_cnt|voice_intercity_in_cnt|voice_onnet_out_cnt|voice_out_cnt|voice_intercity_out_cnt|voice_intenational_in_cnt|voice_international_in_sec|voice_intercity_out_sec|voice_out_sec|voice_out_nopackage_sec|voice_international_out_cnt|voice_onnet_out_sec|voice_in_sec|voice_onnet_in_sec|voice_intercity_in_sec|voice_international_out_sec|
+---------+----------+------------+------------------+----------------------+-------------------+-------------+-----------------------+-------------------------+--------------------------+-----------------------+-------------+----------

### Join with data_traffic => client_profile

In [21]:
client_profile = voice_traffic.join(data_traffic, 
                                    on=["client_id", "time_key"], 
                                    how='full_outer')

# колонка "YYYY-mm"
client_profile = client_profile.withColumn("time_key",
                                           F.col("time_key").substr(1, 7))   

# agg by month
col_list = list(client_profile.drop("client_id", "time_key").columns)
client_profile = client_profile.groupBy("client_id", "time_key").agg(*[F.sum(x).alias(x) for x in col_list])                           

In [22]:
#client_profile.show(10, truncate=False)

+---------+--------+------------+------------------+----------------------+-------------------+-------------+-----------------------+-------------------------+--------------------------+-----------------------+-------------+-----------------------+---------------------------+-------------------+------------+------------------+----------------------+---------------------------+-----------------+------------------+-----------------+
|client_id|time_key|voice_in_cnt|voice_onnet_in_cnt|voice_intercity_in_cnt|voice_onnet_out_cnt|voice_out_cnt|voice_intercity_out_cnt|voice_intenational_in_cnt|voice_international_in_sec|voice_intercity_out_sec|voice_out_sec|voice_out_nopackage_sec|voice_international_out_cnt|voice_onnet_out_sec|voice_in_sec|voice_onnet_in_sec|voice_intercity_in_sec|voice_international_out_sec|data_4g_mb       |data_all_mb       |data_3g_mb       |
+---------+--------+------------+------------------+----------------------+-------------------+-------------+---------------------

### Сохранение в MongoDB

In [23]:
!apt install mongodb
!service mongodb start

Reading package lists... Done
Building dependency tree       
Reading state information... Done
mongodb is already the newest version (1:3.6.3-0ubuntu1.4).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.
 * Starting database mongodb
   ...done.


In [24]:
import pymongo
import json
from pymongo import MongoClient, InsertOne

client = MongoClient()
client.list_database_names()

['admin', 'config', 'local']

In [None]:
# save data_traffic
collection = client.local.data_traffic_collection

data_traffic_json = data_traffic.toJSON().collect()
requesting = []

for jsonObj in data_traffic_json:
  myDict = json.loads(jsonObj)
  requesting.append(InsertOne(myDict))

result = collection.bulk_write(requesting)
client.close()

In [None]:
# save voice_traffic
collection = client.local.voice_traffic_collection

voice_traffic_json = voice_traffic.toJSON().collect()
requesting = []

for jsonObj in voice_traffic_json:
  myDict = json.loads(jsonObj)
  requesting.append(InsertOne(myDict))

result = collection.bulk_write(requesting)
client.close()

In [None]:
# save client_profile
collection = client.local.client_profile_collection

client_profile_json = client_profile.toJSON().collect()
requesting = []

for jsonObj in client_profile_json:
  myDict = json.loads(jsonObj)
  requesting.append(InsertOne(myDict))

result = collection.bulk_write(requesting)
client.close()

### Был вариант сохранения в SQLlite

In [34]:
"""
!curl https://www.sqlite.org/src/tarball/sqlite.tar.gz?r=release | tar xz
%cd sqlite/
!./configure
!make sqlite3.c
%cd /content
!npx degit coleifer/pysqlite3 -f
!cp sqlite/sqlite3.[ch] .
!python setup.py build_static build
!cp build/lib.linux-x86_64-3.7/pysqlite3/_sqlite3.cpython-37m-x86_64-linux-gnu.so \
     /usr/lib/python3.7/lib-dynload/
"""

'\n!curl https://www.sqlite.org/src/tarball/sqlite.tar.gz?r=release | tar xz\n%cd sqlite/\n!./configure\n!make sqlite3.c\n%cd /content\n!npx degit coleifer/pysqlite3 -f\n!cp sqlite/sqlite3.[ch] .\n!python setup.py build_static build\n!cp build/lib.linux-x86_64-3.7/pysqlite3/_sqlite3.cpython-37m-x86_64-linux-gnu.so      /usr/lib/python3.7/lib-dynload/\n'

In [None]:
"""
import sqlite3
from sqlite3 import Error

create_connection(r"C:\sqlite\db\pythonsqlite.db")
cnx = sqlite3.connect(':memory:')
"""

In [None]:
"""
data_traffic.toPandas().to_sql(name='data_traffic', con=cnx)
voice_traffic.toPandas().to_sql(name='voice_traffic', con=cnx)
client_profile.toPandas().to_sql(name='client_profile', con=cnx)
"""

### Попытки поднять AeroSpike увенчались провалом

In [None]:
"""
!wget -O aerospike.tgz https://ru.aerospike.com/download/server/5.7.0.8/artifact/ubuntu18
!tar -xvf aerospike.tgz
%cd aerospike-server-community-5.7.0.8-ubuntu18.04
!./asinstall
#!service --status-all
#!service aerospike start
"""

In [None]:
"""
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="35.188.254.184"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test" 
AEROSPIKE_SPARK_JAR_VERSION="3.1.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)

import os 
AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
"""

In [None]:
"""
# Write data_traffic to Aerospike

dataset \
.write \
.mode('overwrite') \
.format("aerospike")  \
.option("aerospike.writeset", "data_traffic")\
.option("aerospike.updateByKey", "client_id") \
.save()
"""

Py4JJavaError: ignored