# Подготовка таблиц

In [1]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10


In [2]:
import psycopg2 as pg

In [3]:
conn = pg.connect(
    dbname="big_data_snowflake",
    user="admin",
    password="DogeCoin",
    host="demo13b.ddnsfree.com",
    port="30000"
)

In [4]:
cur = conn.cursor()

with open('ddl.sql', 'r') as f:
    cur.execute(f.read())

conn.commit()

cur.close()
conn.close()

# Загрузка основного датасета из БД

In [5]:
from pyspark.sql import SparkSession

In [6]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/work/jars/postgresql-42.7.3.jar pyspark-shell'

In [7]:
spark = SparkSession.builder.appName("DataLoader").getOrCreate()

In [8]:
jdbc_url = "jdbc:postgresql://demo13b.ddnsfree.com:30000/big_data_snowflake"
properties = {
    "user": "admin",
    "password": "DogeCoin",
    "driver": "org.postgresql.Driver"
}

In [9]:
mock_data_df = spark.read.jdbc(url=jdbc_url, properties=properties, table='mock_data')

In [10]:
mock_data_df.head()

Row(id=1, customer_first_name='Barron', customer_last_name='Rawlyns', customer_age=61, customer_email='bmassingham0@army.mil', customer_country='China', customer_postal_code='', customer_pet_type='cat', customer_pet_name='Priscella', customer_pet_breed='Labrador Retriever', seller_first_name='Bevan', seller_last_name='Massingham', seller_email='bmassingham0@answers.com', seller_country='Indonesia', seller_postal_code='', product_name='Dog Food', product_category='Food', product_price=77.97000122070312, product_quantity=89, sale_date='5/14/2021', sale_customer_id=1, sale_seller_id=1, sale_product_id=1, sale_quantity=4, sale_total_price=487.70001220703125, store_name='Youopia', store_location='Suite 75', store_city='Xichehe', store_state='', store_country='United States', store_phone='564-244-8660', store_email='bmassingham0@networkadvertising.org', pet_category='Cats', product_weight=13.399999618530273, product_color='Indigo', product_size='Medium', product_brand='Skajo', product_materi

# Заполнение таблиц

### Покупатели

In [11]:
customers_df = mock_data_df.select(
    'customer_first_name', 
    'customer_last_name',
    'customer_age',
    'customer_email',
    'customer_country',
    'customer_postal_code',
    'customer_pet_type',
    'customer_pet_name',
    'customer_pet_breed'
).distinct()

In [12]:
customers_df = customers_df\
    .withColumnRenamed('customer_first_name', 'first_name')\
    .withColumnRenamed('customer_last_name', 'last_name')\
    .withColumnRenamed('customer_age', 'age')\
    .withColumnRenamed('customer_email', 'email')\
    .withColumnRenamed('customer_country', 'country')\
    .withColumnRenamed('customer_postal_code', 'postal_code')\
    .withColumnRenamed('customer_pet_type', 'pet_type')\
    .withColumnRenamed('customer_pet_name', 'pet_name')\
    .withColumnRenamed('customer_pet_breed', 'pet_breed')

In [13]:
customers_df.show()

+----------+----------+---+--------------------+-------------+-----------+--------+--------+------------------+
|first_name| last_name|age|               email|      country|postal_code|pet_type|pet_name|         pet_breed|
+----------+----------+---+--------------------+-------------+-----------+--------+--------+------------------+
|    Woodie| Barnsdale| 66|rbutchardcv@arste...|       France|49117 CEDEX|    bird|    Bing|          Parakeet|
|    Karlik|  Hansford| 57|wwakefordgl@sitem...|    Indonesia|           |     dog| Chiquia|Labrador Retriever|
|   Sherrie|   Flatley| 38|epinnigerkx@print...|        Japan|   979-1543|    bird|Felicdad|Labrador Retriever|
|     Carry|      Brea| 51|rlervenmy@ustream.tv| South Africa|       8468|     cat|   Fabio|          Parakeet|
|      Elli|  Cunniffe| 36|ilidgertwood5x@sf...|  Philippines|       4326|    bird|Jermaine|          Parakeet|
|    Emelen|   Godding| 47|  aaltoftsdl@cmu.edu|    Indonesia|           |     cat|    Nani|           S

In [14]:
customers_df.write.jdbc(url=jdbc_url, table="customers", mode="append", properties=properties)

### Продавцы

In [15]:
sellers_df = mock_data_df.select(
    'seller_first_name',
    'seller_last_name',
    'seller_email',
    'seller_country',
    'seller_postal_code'
).distinct()

In [16]:
sellers_df = sellers_df\
    .withColumnRenamed('seller_first_name', 'first_name')\
    .withColumnRenamed('seller_last_name', 'last_name')\
    .withColumnRenamed('seller_email', 'email')\
    .withColumnRenamed('seller_country', 'country')\
    .withColumnRenamed('seller_postal_code', 'postal_code')

sellers_df.show()

+----------+-----------+--------------------+-----------+-----------+
|first_name|  last_name|               email|    country|postal_code|
+----------+-----------+--------------------+-----------+-----------+
|      Jade|    Poulden|jpoulden2c@source...|      China|           |
|     Meier|     Byrnes|mbyrnesf8@hubpage...|    Ireland|        W91|
| Granville|      Hemms|  ghemmshk@salon.com|     Mexico|      56617|
|   Brander|    Paradin|  bparadinc8@163.com|      China|           |
|    Evelyn|  Trubshawe|etrubshawenb@lati...|    Ukraine|           |
|Georgianna|  Bradbrook|gbradbrooknx@page...|Philippines|       5103|
|   Katleen|     Cosins|   kcosins8@wisc.edu|     Serbia|           |
|   Annabel|     Brumen|   abrumen4s@com.com| Azerbaijan|           |
|    Wendel|  Castleman|wcastleman8f@naro...|    Morocco|           |
|   Pernell|   Flacknoe|pflacknoe98@netwo...|   Colombia|     734048|
|    Bordie|  Collymore|bcollymore9f@hc36...|      Italy|      35129|
|      Tony|   Bramh

In [17]:
sellers_df.write.jdbc(url=jdbc_url, table="sellers", mode="append", properties=properties)

### Товары

In [18]:
products_df = mock_data_df.select(
    'product_name',
    'product_category',
    'pet_category',
    'product_price',
    'product_quantity',
    'product_weight',
    'product_color',
    'product_size',
    'product_brand',
    'product_material',
    'product_description',
    'product_rating',
    'product_reviews',
    'product_release_date',
    'product_expiry_date'
).distinct()

In [19]:
products_df = products_df\
    .withColumnRenamed('product_name', 'name')\
    .withColumnRenamed('product_category', 'category')\
    .withColumnRenamed('product_price', 'price')\
    .withColumnRenamed('product_quantity', 'quantity')\
    .withColumnRenamed('product_weight', 'weight')\
    .withColumnRenamed('product_color', 'color')\
    .withColumnRenamed('product_size', 'size')\
    .withColumnRenamed('product_brand', 'brand')\
    .withColumnRenamed('product_material', 'material')\
    .withColumnRenamed('product_description', 'description')\
    .withColumnRenamed('product_rating', 'rating')\
    .withColumnRenamed('product_reviews', 'reviews')\
    .withColumnRenamed('product_release_date', 'release_date')\
    .withColumnRenamed('product_expiry_date', 'expiry_date')\

products_df.show()

+---------+--------+------------+-----+--------+------+----------+------+---------+----------+--------------------+------+-------+------------+-----------+
|     name|category|pet_category|price|quantity|weight|     color|  size|    brand|  material|         description|rating|reviews|release_date|expiry_date|
+---------+--------+------------+-----+--------+------+----------+------+---------+----------+--------------------+------+-------+------------+-----------+
|  Cat Toy|    Food|        Fish|27.23|      42|   4.0|    Fuscia|Medium|   Yakijo|     Glass|In congue. Etiam ...|   1.7|    890|   5/15/2011| 12/31/2028|
| Dog Food|    Cage|        Dogs|49.02|      17|  36.2|    Maroon|Medium|Linklinks|     Vinyl|Cras non velit ne...|   4.1|     25|   2/26/2017|   6/2/2030|
|Bird Cage|     Toy|    Reptiles|85.53|      26|  44.3|      Blue| Small|   Tambee|Plexiglass|Morbi non lectus....|   2.1|    691|  12/23/2020|  3/14/2024|
|Bird Cage|    Cage|        Fish|  6.3|      59|  45.5|       Re

In [20]:
from pyspark.sql.functions import to_date
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [21]:
products_df = products_df\
    .withColumn('release_date', to_date('release_date', 'M/d/yyyy'))\
    .withColumn('expiry_date', to_date('expiry_date', 'M/d/yyyy'))

products_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- pet_category: string (nullable = true)
 |-- price: float (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- weight: float (nullable = true)
 |-- color: string (nullable = true)
 |-- size: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- material: string (nullable = true)
 |-- description: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- reviews: integer (nullable = true)
 |-- release_date: date (nullable = true)
 |-- expiry_date: date (nullable = true)



In [22]:
products_df.write.jdbc(url=jdbc_url, table="products", mode="append", properties=properties)

### Магазины

In [23]:
stores_df = mock_data_df.select(
    'store_name',
    'store_location',
    'store_city',
    'store_state',
    'store_country',
    'store_phone',
    'store_email'
).distinct()

In [24]:
stores_df = stores_df\
    .withColumnRenamed('store_name', 'name')\
    .withColumnRenamed('store_location', 'location')\
    .withColumnRenamed('store_city', 'city')\
    .withColumnRenamed('store_state', 'state')\
    .withColumnRenamed('store_country', 'country')\
    .withColumnRenamed('store_phone', 'phone')\
    .withColumnRenamed('store_email', 'email')

stores_df.show()

+----------+------------+-------------------+-----+-----------+------------+--------------------+
|      name|    location|               city|state|    country|       phone|               email|
+----------+------------+-------------------+-----+-----------+------------+--------------------+
|    Avamba|     Apt 633|            Sakurai|     |  Venezuela|302-743-6989| btolletb@dion.ne.jp|
|   Oyonder|    Apt 1879|               Perm|     |      China|113-568-4127|kcadell6k@fastcom...|
|Fivebridge|    Apt 1486|            Miskolc|   BZ|     Russia|298-216-4562|dheskin76@vinaora...|
|     Jaloo|     Suite 4|           Sarongan|     |     Latvia|330-967-2831|msellorju@skyrock...|
|     Quimm|    Apt 1809|Figueiró dos Vinhos|   10|    Morocco|577-217-8194| tnunnery2g@imdb.com|
|  Gigazoom|     Apt 854|    Kuragaki-kosugi|     |  Indonesia|845-948-6044|spendlebury48@img...|
|    Skinix|  13th Floor|           Pasauran|     |     Poland|155-265-8133|bclyburnei@bbc.co.uk|
|   Dabvine|   1st F

In [25]:
stores_df.write.jdbc(url=jdbc_url, table="stores", mode="append", properties=properties)

### Поставщики

In [26]:
suppliers_df = mock_data_df.select(
    'supplier_name',
    'supplier_contact',
    'supplier_email',
    'supplier_phone',
    'supplier_address',
    'supplier_city',
    'supplier_country'
).distinct()

In [27]:
suppliers_df = suppliers_df\
    .withColumnRenamed('supplier_name', 'name')\
    .withColumnRenamed('supplier_contact', 'contact')\
    .withColumnRenamed('supplier_email', 'email')\
    .withColumnRenamed('supplier_phone', 'phone')\
    .withColumnRenamed('supplier_address', 'address')\
    .withColumnRenamed('supplier_city', 'city')\
    .withColumnRenamed('supplier_country', 'country')

suppliers_df.show()

+------------+------------------+--------------------+------------+------------+------------+------------+
|        name|           contact|               email|       phone|     address|        city|     country|
+------------+------------------+--------------------+------------+------------+------------+------------+
|       LiveZ|    Kandy Brinkley|kbrinkley1t@admin.ch|857-497-1268|    Suite 33|Puerto Bello| Philippines|
|       Meejo| Letisha Kennerley|lkennerleye7@psu.edu|745-964-4521|    Room 318|     Karatsu|       China|
|       Quinu|    Tisha Colquyte| tcolquytepz@icq.com|485-924-2211|  11th Floor|       Louta|    Colombia|
|       Kamba|       Gena Malloy|    gmalloy71@hp.com|253-333-8052|    Suite 66|       Kotes|South Africa|
|     Innojam|      Rhody Caroll|rcaroll99@elegant...|451-439-9637|PO Box 21978|  Carpentras|      Canada|
|        Mita|   Laurie Waistell|lwaistell19@surve...|193-723-9251|PO Box 82426|    Koryčany|      Greece|
|  Topicshots|Cairistiona Totaro|ctot

In [28]:
suppliers_df.write.jdbc(url=jdbc_url, table="suppliers", mode="append", properties=properties)

### Продажи

In [29]:
from pyspark.sql.functions import col

In [30]:
customers_full_df = spark.read.jdbc(url=jdbc_url, properties=properties, table='customers')
sellers_full_df = spark.read.jdbc(url=jdbc_url, properties=properties, table='sellers')
products_full_df = spark.read.jdbc(url=jdbc_url, properties=properties, table='products')
stores_full_df = spark.read.jdbc(url=jdbc_url, properties=properties, table='stores')
suppliers_full_df = spark.read.jdbc(url=jdbc_url, properties=properties, table='suppliers')

In [31]:
sales_df = mock_data_df\
    .join(
        customers_full_df,
        (mock_data_df.customer_email == customers_full_df.email)
    ).join(
        sellers_full_df,
        (mock_data_df.seller_email == sellers_full_df.email)
    ).join(
        stores_full_df,
        (mock_data_df.store_email == stores_full_df.email)
    ).join(
        suppliers_full_df,
        (mock_data_df.supplier_email == suppliers_full_df.email)
    ).join(
        products_full_df,
        (mock_data_df.product_name == products_full_df.name) &
        (mock_data_df.product_category == products_full_df.category) &
        (mock_data_df.pet_category == products_full_df.pet_category) &
        (mock_data_df.product_weight == products_full_df.weight) & 
        (mock_data_df.product_color == products_full_df.color) &
        (mock_data_df.product_size == products_full_df.size) &
        (mock_data_df.product_material == products_full_df.material) &
        (mock_data_df.product_price == products_full_df.price) &
        (mock_data_df.product_brand == products_full_df.brand) &
        (mock_data_df.product_description == products_full_df.description)
    ).select(
        customers_full_df.id.alias('customer_id'),
        sellers_full_df.id.alias('seller_id'),
        products_full_df.id.alias('product_id'),
        stores_full_df.id.alias('store_id'),
        suppliers_full_df.id.alias('supplier_id'),
        to_date(mock_data_df.sale_date, 'M/d/yyyy').alias('date'),
        mock_data_df.sale_quantity.alias('quantity'),
        mock_data_df.sale_total_price.alias('total_price')
)

sales_df.show()

+-----------+---------+----------+--------+-----------+----------+--------+-----------+
|customer_id|seller_id|product_id|store_id|supplier_id|      date|quantity|total_price|
+-----------+---------+----------+--------+-----------+----------+--------+-----------+
|          3|     3583|      6218|     535|       7083|2021-01-14|       7|      19.18|
|        476|     7351|      1195|    1442|       1887|2021-03-26|       9|     143.98|
|        959|     3330|      2431|    2526|       1882|2021-02-05|      10|     460.94|
|       1229|     9566|      6592|    8038|       6883|2021-08-06|       7|     325.13|
|       1318|     1273|      6720|     241|       5866|2021-03-10|       7|     364.95|
|       1471|     1629|       978|    9824|       7015|2021-01-07|       1|     400.37|
|       1812|     2200|      1961|    4206|       8596|2021-06-01|       6|     166.29|
|       2204|     5813|      3459|    5073|       7737|2021-04-02|       3|      32.49|
|       2210|     6763|      223

In [32]:
sales_df.write.jdbc(url=jdbc_url, table="sales", mode="append", properties=properties)