In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/22 18:06:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
stores_schema = StructType([
    StructField('Store', IntegerType()),
    StructField('Type', StringType()),
    StructField('Size', IntegerType())
])
df_stores = spark.read.format('csv').option("header", True).schema(stores_schema).load('./input/stores.csv')
df_stores.createOrReplaceTempView('stores')

In [3]:
df_features = spark.read.format('csv').option("header", True).load('./input/features.csv')
df_features.createOrReplaceTempView('features')

In [4]:
df = spark.sql("Describe stores")
df.show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|   Store|      int|   null|
|    Type|   string|   null|
|    Size|      int|   null|
+--------+---------+-------+



In [5]:
df = spark.sql("Select store, size from stores")
df.show(5)

+-----+------+
|store|  size|
+-----+------+
|    1|151315|
|    2|202307|
|    3| 37392|
|    4|205863|
|    5| 34875|
+-----+------+
only showing top 5 rows



In [6]:
df = spark.sql("Select * from stores where type = 'A' AND size > 200000")
df.show(5)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    2|   A|202307|
|    4|   A|205863|
|    6|   A|202505|
|   11|   A|207499|
|   13|   A|219622|
+-----+----+------+
only showing top 5 rows



In [7]:
df = spark.sql("Select type, SUM(size), avg(size), count(1) from stores Group by type")
df.show(5)

+----+---------+------------------+--------+
|type|sum(size)|         avg(size)|count(1)|
+----+---------+------------------+--------+
|   B|  1720242|101190.70588235294|      17|
|   C|   243250|40541.666666666664|       6|
|   A|  3899450|177247.72727272726|      22|
+----+---------+------------------+--------+



In [11]:
df = spark.sql("Select store, type, size, collect_list(size) over (PARTITION by type order by store) as sum from stores")
df.show(50)

+-----+----+------+--------------------+
|store|type|  size|                 sum|
+-----+----+------+--------------------+
|    1|   A|151315|            [151315]|
|    2|   A|202307|    [151315, 202307]|
|    4|   A|205863|[151315, 202307, ...|
|    6|   A|202505|[151315, 202307, ...|
|    8|   A|155078|[151315, 202307, ...|
|   11|   A|207499|[151315, 202307, ...|
|   13|   A|219622|[151315, 202307, ...|
|   14|   A|200898|[151315, 202307, ...|
|   19|   A|203819|[151315, 202307, ...|
|   20|   A|203742|[151315, 202307, ...|
|   24|   A|203819|[151315, 202307, ...|
|   26|   A|152513|[151315, 202307, ...|
|   27|   A|204184|[151315, 202307, ...|
|   28|   A|206302|[151315, 202307, ...|
|   31|   A|203750|[151315, 202307, ...|
|   32|   A|203007|[151315, 202307, ...|
|   33|   A| 39690|[151315, 202307, ...|
|   34|   A|158114|[151315, 202307, ...|
|   36|   A| 39910|[151315, 202307, ...|
|   39|   A|184109|[151315, 202307, ...|
|   40|   A|155083|[151315, 202307, ...|
|   41|   A|1963

In [9]:
df = spark.sql("Select stores.store as store, first(type), avg(features.fuel_price) as price from stores left join features on stores.store = features.store group by stores.store")
df.show(5)

+-----+-----------+------------------+
|store|first(type)|             price|
+-----+-----------+------------------+
|    1|          A| 3.259241758241762|
|    2|          A| 3.259241758241762|
|    3|          B| 3.259241758241762|
|    4|          A|3.2548846153846154|
|    5|          B| 3.259241758241762|
|    6|          A| 3.259241758241762|
|    7|          B|3.2944010989010986|
|    8|          A| 3.259241758241762|
|    9|          B| 3.259241758241762|
|   10|          B|3.6156483516483515|
|   11|          A| 3.259241758241762|
|   12|          B| 3.643653846153846|
|   13|          A| 3.328763736263739|
|   14|          A|3.4764120879120886|
|   15|          B| 3.643357142857143|
|   16|          B|3.2944010989010986|
|   17|          B| 3.328763736263739|
|   18|          B|3.4978736263736248|
|   19|          A| 3.643357142857143|
|   20|          A|3.4764120879120886|
|   21|          B| 3.259241758241762|
|   22|          B|3.4978736263736248|
|   23|          B|3.4978

In [10]:
df = spark.sql("Select stores.store as store, first(type), collect_list(features.fuel_price) as price from stores left join features on stores.store = features.store group by stores.store")
df.show(5)

+-----+-----------+--------------------+
|store|first(type)|               price|
+-----+-----------+--------------------+
|   31|          A|[3.62, 3.556, 3.4...|
|   34|          A|[3.58, 3.518, 3.3...|
|   28|          A|[3.865, 3.823, 3....|
|   26|          A|[3.879, 3.803, 3....|
|   27|          A|[3.951, 3.913, 3....|
+-----+-----------+--------------------+
only showing top 5 rows

