In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [10]:
def show(df):
    print(df.count())
    df.show()
    return df

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, StringType
spark = SparkSession.builder.getOrCreate()

df = show(spark.read.parquet('data.parquet'))



108984
+------+--------------------+------+-----+---------+------------+----------------+--------+---+----------+-----+--------------------+--------------------+
|  shop|                name|weight|price|old_price|price_per_kg|old_price_per_kg|discount|cat|product_id| type|              datetm|                 url|
+------+--------------------+------+-----+---------+------------+----------------+--------+---+----------+-----+--------------------+--------------------+
|prisma|Чиабатта в газово...|   250| 99.9|     null|       399.6|            null|    null|  8|    226301|piece|2020-03-15 19:43:...|/products/27683-c...|
|prisma|Сдоба Наслаждение...|   230|123.9|     null|       538.7|            null|    null|  8|    536951|piece|2020-03-15 19:43:...|/products/43616-s...|
|prisma|Хлебец Подовый с ...|   300| 44.9|     60.9|      149.67|           203.0|   26.27|  8|    222117|piece|2020-03-15 19:43:...|/products/16970-h...|
|prisma|Хлебцы порционные...|   240| 65.9|     null|      274.5

In [12]:
# drop duplicates
df2 = show(df.dropDuplicates(['product_id','shop']))

57124
+-----+--------------------+------+------+---------+------------+----------------+--------+---+----------+-----+--------------------+--------------------+
| shop|                name|weight| price|old_price|price_per_kg|old_price_per_kg|discount|cat|product_id| type|              datetm|                 url|
+-----+--------------------+------+------+---------+------------+----------------+--------+---+----------+-----+--------------------+--------------------+
|metro|Масло подсолнечно...|   920| 104.0|     null|      113.04|            null|    null| 99|    122548|piece|2020-03-15 18:49:...|/products/1187-ma...|
|metro|  Мята ТМ Зеленьторг|   100|159.89|     null|      1598.9|            null|    null|  1|    122853|piece|2020-03-15 17:57:...|/products/3928-my...|
|metro|Мороженое эскимо ...|    80| 63.69|    70.79|      796.13|         884.875|   10.03|  2|    122854|piece|2020-03-15 18:04:...|/products/14728-m...|
|metro|Чай черный байхов...|   250| 194.9|    369.0|       779.6

In [13]:
df2.createOrReplaceTempView("food")


In [14]:
# total products per shop
show(spark.sql("SELECT shop, count(*) FROM food group by shop"))

4
+-------+--------+
|   shop|count(1)|
+-------+--------+
| prisma|    8774|
|  metro|   20834|
|karusel|   13166|
|  lenta|   14350|
+-------+--------+



DataFrame[shop: string, count(1): bigint]

In [15]:
# total products with discounts per shop 
show(spark.sql("SELECT shop, count(*) FROM food where old_price is not null group by shop"))

4
+-------+--------+
|   shop|count(1)|
+-------+--------+
| prisma|     922|
|  metro|    2862|
|karusel|    2009|
|  lenta|    1492|
+-------+--------+



DataFrame[shop: string, count(1): bigint]

In [16]:
#joining tables
q = '''SELECT m.name, m.weight, m.price m_price, l.price l_price, k.price k_price, p.price p_price,
        m.old_price m_old_price, l.old_price l_old_price, k.old_price k_old_price, p.old_price p_old_price
        FROM (select * from food where shop='metro') as m
        INNER JOIN (select * from food where shop='lenta') as l
            ON m.url=l.url
        INNER JOIN (select * from food where shop='karusel') as k
            ON m.url=k.url
        INNER JOIN (select * from food where shop='prisma') as p
            ON m.url=p.url
            
'''
joined = show(spark.sql(q).cache())
joined.coalesce(1).write.csv('joined.csv', mode='overwrite', sep='\t', nullValue = '\u0000',
    emptyValue = '\u0000', header=True)
joined.createOrReplaceTempView('joined')


964
+--------------------+------+-------+-------+-------+-------+-----------+-----------+-----------+-----------+
|                name|weight|m_price|l_price|k_price|p_price|m_old_price|l_old_price|k_old_price|p_old_price|
+--------------------+------+-------+-------+-------+-------+-----------+-----------+-----------+-----------+
|Овощные галеты Зе...|   500|  235.0| 242.79| 259.99|  179.9|       null|       null|       null|      242.9|
|Биоактивная зубна...|   100|  151.0|  88.99| 149.99|  184.9|       null|     157.69|       null|       null|
|Шоколад молочный ...|   100|   76.0|  49.89|  85.99|  104.9|       null|      88.39|       null|       null|
|Кофе в зернах Pre...|   250|  307.0| 198.89| 351.99|  419.9|       null|     378.99|       null|       null|
|Средство для мыть...|   500|  130.0| 146.39|  99.99|  159.9|       null|       null|     146.99|       null|
|Фруктовое пюре из...|   100|   36.0|  37.79|  37.99|   38.9|       null|       null|       null|       null|
|Драже

In [17]:
# sum of prices ignore discounts
show(spark.sql('select sum(m_price),sum(l_price),sum(k_price),sum(p_price) from joined where  m_old_price IS NULL AND  l_old_price IS NULL AND k_old_price IS  NULL AND p_old_price IS  NULL'))

1
+-----------------+-----------------+-----------------+-----------------+
|     sum(m_price)|     sum(l_price)|     sum(k_price)|     sum(p_price)|
+-----------------+-----------------+-----------------+-----------------+
|76482.74999523163|78939.19991779327|76190.60993289948|86779.39966773987|
+-----------------+-----------------+-----------------+-----------------+



DataFrame[sum(m_price): double, sum(l_price): double, sum(k_price): double, sum(p_price): double]

In [18]:
# sum of prices
show(spark.sql('select sum(m_price),sum(l_price),sum(k_price),sum(p_price) from joined'))


1
+------------------+------------------+-----------------+------------------+
|      sum(m_price)|      sum(l_price)|     sum(k_price)|      sum(p_price)|
+------------------+------------------+-----------------+------------------+
|152692.18993282318|165778.82007026672|152971.6798429489|180685.98883247375|
+------------------+------------------+-----------------+------------------+



DataFrame[sum(m_price): double, sum(l_price): double, sum(k_price): double, sum(p_price): double]