#### Spark

На Spark нам необходимо с помощью join создать плоскую витрину, содержащую информацию из всех трех таблиц:

•adverts_data – данные с характеристиками объявлений;

•live_adverts – данные с текущими объявлениями;

•user_passports – данные с характеристиками пользователей.

Результирующую таблицу в формате Parquet необходимо положить в директории '/student/'.

In [1]:
#Spark connection with S3 options
import os
import socket
from pyspark.sql import SparkSession
import secret

# Указываем credentials и параметры для работы с S3
aws_access_key = secret.aws_access_key
aws_secret_key = secret.aws_secret_key
s3_bucket = secret.s3_bucket
s3_endpoint_url = secret.s3_endpoint_url

#В переменную student_directory необходимо подставить свой username в karpov.courses
student_directory = '/student/'
 
APACHE_MASTER_IP = socket.gethostbyname("apache-spark-master-0.apache-spark-headless.apache-spark.svc.cluster.local")
APACHE_MASTER_URL = f"spark://{APACHE_MASTER_IP}:7077"
POD_IP = os.getenv("MY_POD_IP")
SPARK_APP_NAME = f"spark-{os.environ['HOSTNAME']}"

JARS = secret.JARS

MEM = "512m"
CORES = 1
 
spark = SparkSession.\
        builder.\
        appName(SPARK_APP_NAME).\
        master(APACHE_MASTER_URL).\
        config("spark.executor.memory", MEM).\
        config("spark.jars", JARS).\
        config("spark.executor.cores", CORES).\
        config("spark.driver.host", POD_IP).\
        config("spark.hadoop.fs.s3a.access.key", aws_access_key). \
        config("spark.hadoop.fs.s3a.secret.key", aws_secret_key). \
        config("fs.s3a.endpoint", "https://storage.yandexcloud.net").  \
        config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"). \
        config("spark.hadoop.fs.s3a.path.style.access", True). \
        config("spark.hadoop.fs.s3a.committer.name", "directory"). \
        config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"). \
        getOrCreate()



24/01/02 09:25:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [13]:
# Читаем файлы с исходными данными
df_adverts_all = spark.read.parquet('s3a://kc-hardda-projects/shared/adverts_data.parquet')
df_live_adverts = spark.read.parquet('s3a://kc-hardda-projects/shared/live_adverts.parquet')
df_user_passports = spark.read.parquet('s3a://kc-hardda-projects/shared/user_passports.parquet')

In [14]:
df_adverts_all.printSchema()

root
 |-- execution_date: timestamp (nullable = true)
 |-- advert_id: long (nullable = true)
 |-- mark: string (nullable = true)
 |-- model: string (nullable = true)
 |-- price: double (nullable = true)
 |-- year: double (nullable = true)
 |-- fuel: string (nullable = true)
 |-- color: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- body: string (nullable = true)
 |-- country: string (nullable = true)



In [15]:
df_adverts_all.dropDuplicates(['advert_id']).show()
df_adverts_all.dropDuplicates(['advert_id']).count()

                                                                                

+-------------------+---------+----+----------+---------+------+------+-------+------------+-----+-------+
|     execution_date|advert_id|mark|     model|    price|  year|  fuel|  color|transmission| body|country|
+-------------------+---------+----+----------+---------+------+------+-------+------------+-----+-------+
|2021-04-13 00:00:00|126746009|null|      null|     null|  null|  null|   null|        null| null|   null|
|2021-09-15 00:00:00|129804737|null|      null|     null|  null|  null|   null|        null| null|   null|
|2020-11-21 00:00:00|130582517|null|      null|     null|  null|  null|   null|        null| null|   null|
|2021-08-29 00:00:00|131290038|null|      null|     null|  null|  null|   null|        null| null|   null|
|2021-01-02 00:00:00|131543541|null|      null|     null|  null|  null|   null|        null| null|   null|
|2021-09-06 00:00:00|132836073|null|      null|     null|  null|  null|   null|        null| null|   null|
|2021-05-31 00:00:00|134709471|null| 

                                                                                

79906

In [16]:
#Убираем строки, где все значения null
df_adverts_all.na.drop('all').show()
df_adverts_all.na.drop('all').count()

                                                                                

+-------------------+---------+----+-----------+---------+------+------+------+------------+-----+-----------+
|     execution_date|advert_id|mark|      model|    price|  year|  fuel| color|transmission| body|    country|
+-------------------+---------+----+-----------+---------+------+------+------+------------+-----+-----------+
|2021-01-22 00:00:00|236576609|null|       null|     null|  null|  null|  null|        null| null|       null|
|2021-01-22 00:00:00|229759299|null|       null|   3000.0|  null|  null|  null|        null| null|       null|
|2021-02-19 00:00:00|228585004|null|       null| 240000.0|2011.0|  null|  null|        null| null|       null|
|2021-02-19 00:00:00|246519623|null|       null|   4500.0|  null|  null|  null|        null| null|       null|
|2021-06-30 00:00:00|236643061|null|       null|   7000.0|  null|  null|  null|        null| null|       null|
|2021-09-03 00:00:00|265857526| ГАЗ|ГАЗель NEXT|2593800.0|2021.0|бензин|  null|    механика|пикап|     Россия|
|

                                                                                

2778976

In [18]:
#Убираем все строки, где null в execution_date и advert_id
df_adverts_all.na.drop(how='all', subset=['execution_date', 'advert_id']).show()

[Stage 32:>                                                         (0 + 1) / 1]

+-------------------+---------+----+-----------+---------+------+------+------+------------+-----+-----------+
|     execution_date|advert_id|mark|      model|    price|  year|  fuel| color|transmission| body|    country|
+-------------------+---------+----+-----------+---------+------+------+------+------------+-----+-----------+
|2021-01-22 00:00:00|236576609|null|       null|     null|  null|  null|  null|        null| null|       null|
|2021-01-22 00:00:00|229759299|null|       null|   3000.0|  null|  null|  null|        null| null|       null|
|2021-02-19 00:00:00|228585004|null|       null| 240000.0|2011.0|  null|  null|        null| null|       null|
|2021-02-19 00:00:00|246519623|null|       null|   4500.0|  null|  null|  null|        null| null|       null|
|2021-06-30 00:00:00|236643061|null|       null|   7000.0|  null|  null|  null|        null| null|       null|
|2021-09-03 00:00:00|265857526| ГАЗ|ГАЗель NEXT|2593800.0|2021.0|бензин|  null|    механика|пикап|     Россия|
|

                                                                                

In [19]:
#Убираем все строки в которых заполнено меньше пяти колонок
df_adverts_all.na.drop(thresh=5).show()
df_adverts_all.na.drop(thresh=5).count()

                                                                                

+-------------------+---------+-------------+------------------+---------+------+----------+-------+------------+-----------+-----------+
|     execution_date|advert_id|         mark|             model|    price|  year|      fuel|  color|transmission|       body|    country|
+-------------------+---------+-------------+------------------+---------+------+----------+-------+------------+-----------+-----------+
|2021-09-03 00:00:00|265857526|          ГАЗ|       ГАЗель NEXT|2593800.0|2021.0|    бензин|   null|    механика|      пикап|     Россия|
|2021-09-08 00:00:00|262624023|          Kia|                K7|3400000.0|2020.0|    бензин| черный|     автомат|      седан|Южная Корея|
|2021-01-03 00:00:00|251470815|         null|              null|4898000.0|2021.0|    дизель|   null|        null|       null|       null|
|2020-12-04 00:00:00|251095460|         null|              null|5798000.0|2021.0|    дизель|   null|        null|       null|       null|
|2020-11-17 00:00:00|252096571|   

                                                                                

217846

In [20]:
df_live_adverts.printSchema()

root
 |-- execution_date: timestamp (nullable = true)
 |-- advert_id: long (nullable = true)
 |-- region: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- platform: string (nullable = true)



In [22]:
df_live_adverts.dropDuplicates(['advert_id']).show()
print(df_live_adverts.dropDuplicates(['advert_id']).count())

#Убираем строки, где все значения null
df_live_adverts.na.drop('all').show()
print(df_live_adverts.na.drop('all').count())

#Убираем все строки, где null в execution_date и advert_id
df_live_adverts.na.drop(how='all', subset=['execution_date', 'advert_id']).show()

#Убираем все строки в которых заполнено меньше трёх колонок
df_live_adverts.na.drop(thresh=3).show()
print(df_live_adverts.na.drop(thresh=3).count())

                                                                                

+-------------------+---------+------+---------+--------+
|     execution_date|advert_id|region|  user_id|platform|
+-------------------+---------+------+---------+--------+
|2021-05-18 00:00:00|126746009|  Омск|123628037| unknown|
|2021-08-04 00:00:00|129804737|  Омск|123559152| unknown|
|2021-11-02 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-05-15 00:00:00|131290038|Казань|123661493| unknown|
|2021-01-16 00:00:00|131543541|Казань|123661493| unknown|
|2021-09-27 00:00:00|132836073|Казань|123858606| desktop|
|2021-03-22 00:00:00|134709471|  Омск|123482031| desktop|
|2021-09-15 00:00:00|136110872|   Уфа|124065985| desktop|
|2021-04-19 00:00:00|136669101|  Омск|124211439| desktop|
|2021-03-17 00:00:00|137298690|  Сочи|123924920| desktop|
|2021-04-03 00:00:00|137489975|  Омск|124604878| desktop|
|2021-08-30 00:00:00|137571386|   Уфа|123462866| desktop|
|2021-08-05 00:00:00|138018608|  Сочи|123924920| desktop|
|2021-08-21 00:00:00|138018640|  Сочи|123924920| desktop|
|2021-05-08 00



79806


                                                                                

+-------------------+---------+------+---------+--------+
|     execution_date|advert_id|region|  user_id|platform|
+-------------------+---------+------+---------+--------+
|2021-06-26 00:00:00|145314141|   Уфа|124243239| unknown|
|2021-04-03 00:00:00|145314141|   Уфа|124243239| unknown|
|2021-09-20 00:00:00|137514150|  Омск|124207514| unknown|
|2021-02-17 00:00:00|129804737|  Омск|123559152| unknown|
|2021-06-26 00:00:00|137514150|  Омск|124207514| unknown|
|2020-12-05 00:00:00|137514150|  Омск|124207514| unknown|
|2021-05-18 00:00:00|126746009|  Омск|123628037| unknown|
|2021-08-23 00:00:00|137514150|  Омск|124207514| unknown|
|2021-05-16 00:00:00|127593536|  Омск|123628037| unknown|
|2021-04-30 00:00:00|131000137|  Омск|123628037| unknown|
|2021-01-22 00:00:00|131000137|  Омск|123628037| unknown|
|2021-09-07 00:00:00|126746009|  Омск|123628037| unknown|
|2020-12-05 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-04-06 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-04-05 00

                                                                                

2771661


                                                                                

+-------------------+---------+------+---------+--------+
|     execution_date|advert_id|region|  user_id|platform|
+-------------------+---------+------+---------+--------+
|2021-06-26 00:00:00|145314141|   Уфа|124243239| unknown|
|2021-04-03 00:00:00|145314141|   Уфа|124243239| unknown|
|2021-09-20 00:00:00|137514150|  Омск|124207514| unknown|
|2021-02-17 00:00:00|129804737|  Омск|123559152| unknown|
|2021-06-26 00:00:00|137514150|  Омск|124207514| unknown|
|2020-12-05 00:00:00|137514150|  Омск|124207514| unknown|
|2021-05-18 00:00:00|126746009|  Омск|123628037| unknown|
|2021-08-23 00:00:00|137514150|  Омск|124207514| unknown|
|2021-05-16 00:00:00|127593536|  Омск|123628037| unknown|
|2021-04-30 00:00:00|131000137|  Омск|123628037| unknown|
|2021-01-22 00:00:00|131000137|  Омск|123628037| unknown|
|2021-09-07 00:00:00|126746009|  Омск|123628037| unknown|
|2020-12-05 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-04-06 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-04-05 00

                                                                                

+-------------------+---------+------+---------+--------+
|     execution_date|advert_id|region|  user_id|platform|
+-------------------+---------+------+---------+--------+
|2021-06-26 00:00:00|145314141|   Уфа|124243239| unknown|
|2021-04-03 00:00:00|145314141|   Уфа|124243239| unknown|
|2021-09-20 00:00:00|137514150|  Омск|124207514| unknown|
|2021-02-17 00:00:00|129804737|  Омск|123559152| unknown|
|2021-06-26 00:00:00|137514150|  Омск|124207514| unknown|
|2020-12-05 00:00:00|137514150|  Омск|124207514| unknown|
|2021-05-18 00:00:00|126746009|  Омск|123628037| unknown|
|2021-08-23 00:00:00|137514150|  Омск|124207514| unknown|
|2021-05-16 00:00:00|127593536|  Омск|123628037| unknown|
|2021-04-30 00:00:00|131000137|  Омск|123628037| unknown|
|2021-01-22 00:00:00|131000137|  Омск|123628037| unknown|
|2021-09-07 00:00:00|126746009|  Омск|123628037| unknown|
|2020-12-05 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-04-06 00:00:00|130582517|  Сочи|123464584| unknown|
|2021-04-05 00



2771661


                                                                                

In [23]:
df_user_passports.printSchema()

root
 |-- global_id: long (nullable = true)
 |-- user_type_name: string (nullable = true)



In [25]:
df_user_passports.dropDuplicates(['global_id']).show()

#Убираем строки, где все значения null
df_user_passports.na.drop('all').show()

#Убираем все строки, где null в global_id
df_user_passports.na.drop(how='all', subset=['global_id']).show()

                                                                                

+---------+--------------+
|global_id|user_type_name|
+---------+--------------+
|123456782|   simple_user|
|123456794|         profi|
|123456796|   simple_user|
|123456800|   simple_user|
|123456801|   simple_user|
|123456813|   simple_user|
|123456816|   simple_user|
|123456826|   simple_user|
|123456831|   simple_user|
|123456855|   simple_user|
|123456858|   simple_user|
|123456860|   simple_user|
|123456863|   simple_user|
|123456865|   simple_user|
|123456870|   simple_user|
|123456874|   simple_user|
|123456879|   simple_user|
|123456884|   simple_user|
|123456891|   simple_user|
|123456896|   simple_user|
+---------+--------------+
only showing top 20 rows

+---------+--------------+
|global_id|user_type_name|
+---------+--------------+
|144746200|   simple_user|
|144758536|   simple_user|
|144974229|   simple_user|
|144992935|   simple_user|
|137375204|   simple_user|
|148358754|   simple_user|
|148410169|   simple_user|
|148543885|   simple_user|
|148683888|   simple_user|
|1

                                                                                

In [26]:
# Джойним DF
df_flat = df_live_adverts.join(df_adverts_all, on=['execution_date', 'advert_id'], how='left') \
                   .join(df_user_passports, df_live_adverts['user_id'] == df_user_passports['global_id'], how='left')

In [27]:
df_flat.coalesce(1).write.format("parquet").save("s3a://kc-hardda-projects/ju-kolesnikova/flat_table", mode='overwrite')

                                                                                

In [None]:
#import pandas as pd
pandas_df = df_flat.coalesce(1).toPandas()
pandas_df.to_parquet('flat_table.parquet')

  series = series.astype(t, copy=False)


In [None]:
spark.stop()