# APACHE SPARK

### Dzień 1

#### Wprowadzenie + Spark SQL

Apache Spark to silnik do obliczeń rozproszonych na licencji open-source. Pierwotnie powstał na Berkley, po czym przekazano go do Apache Software Foundation gdzie jest od tamtej pory utrzymywany i rozwijany. Spark oferuje interfejs pozwalający na programowanie obliczeń na klastrach z domyślną paralelizacją oraz odpornością na awarie.

Spark dostępny jest w Scali, Pythonie, Javie oraz R.

**Komponenty Sparka:**
* Spark core - podstawa Sparka z podstawową abstrakcją danych nazywaną RDD
* Spark SQL - komponent pozwalający na operowanie na ustrukturyzowanych danych z wykorzystaniem operacji znanych z SQL - łatwy w użyciu
* Spark MLlib - komponent zawierający algorytmy ML dostępne w Sparku - ML na skalę klastrów
* Spark Streaming - moduł pozwalający na pracę ze strumnieniami danych
* Spark GraphX - komponent do pracy z grafami

**Architektura Sparka:**
* driver - proces uruchamiający główną funkcję aplikacji i tworzący SparkContext
* executor(y) - proces uruchomiony dla aplikacji w węźle roboczym (worker node), który uruchamia zadania i przechowuje dane w pamięci lub na dysku. Każda aplikacja ma własne executory
* cluster manager - dostępne opcje: YARN, Mesos, Kubernetes, Standalone

**SparkContext:**
* punkt wejścia do pracy ze Sparkiem
* koordynuje procesy na klastrze
* zatrzymanie SparkContextu == zatrzymanie działania aplikacji
* zwykle nazywany `sc`
* kroki niezbędne do utworzenia SparkContextu w pySparku:

> import pyspark

> sc = pyspark.SparkContext(appName="my_app")

**SparkSession:**
* wprowadzony w Spark 2.0
* składa się ze SparkContextu, SQLContextu oraz HiveContext
* zwykle nazywany `spark`
* kroki niezbędne do utworzenia SparkSession w pySparku:

> from pyspark.sql import SparkSession

> spark = SparkSession.builder.appName('my_app').getOrCreate()


**RDD:**
* podstawowa abstrakcja danych w Sparku
* R - resilient
* D - distributed
* D - dataset
* Matei Zharia, et al. `Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing`
* immutable
* in-memory
* lazy evaluated
* parallel
* dwa typy operacji: akcje i transformacje
* przykłady użycia:

In [1]:
import pyspark
sc = pyspark.SparkContext(appName="my_app")

In [3]:
type(sc)

pyspark.context.SparkContext

In [2]:
sc.parallelize(range(20)) \
.map(lambda x: x * 2) \
.filter(lambda x: x != 2) \
.reduce(lambda x,y: x + y)

378

In [5]:
sc.parallelize(range(20)) \
.map(lambda x: x * 2) \
.filter(lambda x: x != 2).collect()# pobranie elementów do drivera, uwaga przy dużych RDD

[0, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

In [6]:
exercise_input = """Spark is a fast and general processing engine compatible with Hadoop data. \
It can run in Hadoop clusters through YARN or Spark's standalone mode, and it can process data in HDFS, \
HBase, Cassandra, Hive, and any Hadoop InputFormat. It is designed to perform both batch processing \
(similar to MapReduce) and new workloads like streaming, interactive queries, and machine learning."""

sc.parallelize(exercise_input.split(" ")) \
.map(lambda x: x.lower().strip(",.()")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y: x + y) \
.collect()

[('is', 2),
 ('engine', 1),
 ('compatible', 1),
 ('hadoop', 3),
 ('run', 1),
 ('in', 2),
 ('clusters', 1),
 ('yarn', 1),
 ("spark's", 1),
 ('mode', 1),
 ('process', 1),
 ('hdfs', 1),
 ('cassandra', 1),
 ('hive', 1),
 ('designed', 1),
 ('perform', 1),
 ('both', 1),
 ('similar', 1),
 ('new', 1),
 ('like', 1),
 ('streaming', 1),
 ('machine', 1),
 ('learning', 1),
 ('spark', 1),
 ('a', 1),
 ('fast', 1),
 ('and', 5),
 ('general', 1),
 ('processing', 2),
 ('with', 1),
 ('data', 2),
 ('it', 3),
 ('can', 2),
 ('through', 1),
 ('or', 1),
 ('standalone', 1),
 ('hbase', 1),
 ('any', 1),
 ('inputformat', 1),
 ('to', 2),
 ('batch', 1),
 ('mapreduce', 1),
 ('workloads', 1),
 ('interactive', 1),
 ('queries', 1)]

In [7]:
sc.stop()

**DataFrame:**
* abstrakcja danych z modułu Spark SQL - u podstaw leżą RDD
* immutable
* in-memory
* resilient
* distributed
* parallel
* przechowuje dodatkowe informacje o strukturze danych (schema)
* rozproszona kolekcja wierszy z nazwanymi kolumnami
* optymalizowane przez Catalyst Optymizer
* pozwala na pracę z danymi wykorzysując zapytania znane z SQL/Hive

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as f# importujemy tak, żeby nie nadpisać funkcji pythonowych

spark = SparkSession.builder.appName('my_app').getOrCreate()

**DataFrame - kolekcja wierszy**

In [2]:
dummy_df = spark.createDataFrame([Row(name='Greg', age=32),
                                 Row(name='Bob', age=27),
                                 Row(name='Alice', age=30)])

In [3]:
dummy_df.show()

+---+-----+
|age| name|
+---+-----+
| 32| Greg|
| 27|  Bob|
| 30|Alice|
+---+-----+



In [4]:
dummy_df2 = spark.createDataFrame([Row(name='Bill', age=26),
                                  Row(name='Carol', age=28),
                                  Row(name='Susan', age=25)])

In [5]:
dummy_df2.show()

+---+-----+
|age| name|
+---+-----+
| 26| Bill|
| 28|Carol|
| 25|Susan|
+---+-----+



In [15]:
dummy_df.take(3)

[Row(age=32, name='Greg'), Row(age=27, name='Bob'), Row(age=30, name='Alice')]

**DataFrame - właściwości**

In [16]:
dummy_df.columns

['age', 'name']

In [17]:
dummy_df.dtypes

[('age', 'bigint'), ('name', 'string')]

In [18]:
dummy_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [19]:
dummy_df.describe().show()

+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 3|    3|
|   mean|29.666666666666668| null|
| stddev|2.5166114784235836| null|
|    min|                27|Alice|
|    max|                32| Greg|
+-------+------------------+-----+



**Sposoby odwoływania się do kolumn**

In [20]:
dummy_df.age

Column<b'age'>

In [26]:
dummy_df["age"]

Column<b'age'>

In [27]:
dummy_df[0]

Column<b'age'>

**Składnia inspirowana SQL**

In [28]:
dummy_df.select("age").show()

+---+
|age|
+---+
| 32|
| 27|
| 30|
+---+



In [29]:
dummy_df.where(dummy_df["age"] > 27).show()

+---+-----+
|age| name|
+---+-----+
| 32| Greg|
| 30|Alice|
+---+-----+



In [30]:
dummy_df.agg(f.avg(dummy_df["age"])).show()

+------------------+
|          avg(age)|
+------------------+
|29.666666666666668|
+------------------+



In [31]:
# lub bardziej SQLowo
dummy_df.select(f.avg(dummy_df["age"])).show()

+------------------+
|          avg(age)|
+------------------+
|29.666666666666668|
+------------------+



In [32]:
dummy_df.union(dummy_df2).show()

+---+-----+
|age| name|
+---+-----+
| 32| Greg|
| 27|  Bob|
| 30|Alice|
| 26| Bill|
| 28|Carol|
| 25|Susan|
+---+-----+



**Zapisywanie wyników**

In [None]:
#dummy_df.write.parquet('path/to/location/dummy_df.parquet')
#dummy_df.write.csv('path/to/location/dummy_df.csv')

**Wczytywanie danych**

In [None]:
#parquet_df = spark.read.parquet('path/to/parquet')
#csv_df = spark.read.csv('path/to/csv', header=True, inferSchema=True)

**Używanie zapytań SQLowych**

https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive

In [33]:
dummy_df.createOrReplaceTempView('dummy_df')# zarejestrowanie df jako TempView

In [34]:
# query zwraca nowy DataFrame
spark.sql('select * from dummy_df').show()

+---+-----+
|age| name|
+---+-----+
| 32| Greg|
| 27|  Bob|
| 30|Alice|
+---+-----+



In [35]:
spark.catalog.dropTempView("dummy_df")

### Przykładowe transformacje z wykorzystaniem API Spark SQL oraz selectów SQLowych

**Tworzenie pokazowych DataFrameów**

In [7]:
import random

In [8]:
random.seed(42)

In [9]:
geo_id = [random.choice(["regA","regB","regC","regD","regE","regF"]) for x in range(400)]

In [10]:
prod_id = [random.choice(["prodA","prodB","prodC","prodD","prodE","prodF",
                          "prodG","prodH","prodI","prodJ","prodK","prodL","prodM"]) for x in range(400)]

In [11]:
value = [random.uniform(1000,10000) for x in range(400)]

In [12]:
df = spark.createDataFrame([Row(prod=p, geo=g, val=v) for p,g,v in zip(prod_id, geo_id, value)])

In [13]:
df.show()

+----+-----+------------------+
| geo| prod|               val|
+----+-----+------------------+
|regF|prodB| 3941.612243931777|
|regA|prodD| 7851.067371046244|
|regA|prodF| 4412.135940077227|
|regF|prodE| 7768.088411993063|
|regC|prodC| 8487.318566397455|
|regB|prodH|3270.4437860414255|
|regB|prodI| 1737.156094854783|
|regB|prodL|1174.4495834500963|
|regF|prodE| 5854.771431302804|
|regA|prodJ| 9999.170456582882|
|regF|prodM| 4149.643093481655|
|regF|prodK| 6851.296839248875|
|regE|prodI|8031.0974464980545|
|regA|prodA| 6865.791897195005|
|regE|prodK| 7788.098836535736|
|regD|prodI|   9546.5055944439|
|regA|prodE|2794.2461412627963|
|regA|prodK|1183.4201558829911|
|regA|prodB|2371.4411120631135|
|regB|prodC|2135.9887738678563|
+----+-----+------------------+
only showing top 20 rows



In [43]:
df.createOrReplaceTempView('train_df')

In [44]:
geo_df = spark.createDataFrame([Row(geo_id = "regA", geo_name = "Region A"),
                                Row(geo_id = "regB", geo_name = "Region B"),
                                Row(geo_id = "regC", geo_name = "Region C"),
                                Row(geo_id = "regD", geo_name = "Region D"),
                                Row(geo_id = "regE", geo_name = "Region_E")])

In [45]:
geo_df.show()

+------+--------+
|geo_id|geo_name|
+------+--------+
|  regA|Region A|
|  regB|Region B|
|  regC|Region C|
|  regD|Region D|
|  regE|Region_E|
+------+--------+



In [46]:
geo_df.createOrReplaceTempView("geo_df")

**Select**

In [48]:
df.select("prod").show(5)

+-----+
| prod|
+-----+
|prodB|
|prodD|
|prodF|
|prodE|
|prodC|
+-----+
only showing top 5 rows



In [17]:
df.select(f.lower(f.col('prod'))).show()

+-----------+
|lower(prod)|
+-----------+
|      prodb|
|      prodd|
|      prodf|
|      prode|
|      prodc|
|      prodh|
|      prodi|
|      prodl|
|      prode|
|      prodj|
|      prodm|
|      prodk|
|      prodi|
|      proda|
|      prodk|
|      prodi|
|      prode|
|      prodk|
|      prodb|
|      prodc|
+-----------+
only showing top 20 rows



In [50]:
spark.sql("select prod from train_df").show(5)

+-----+
| prod|
+-----+
|prodB|
|prodD|
|prodF|
|prodE|
|prodC|
+-----+
only showing top 5 rows



In [51]:
df.drop("prod").show()

+----+------------------+
| geo|               val|
+----+------------------+
|regF| 3941.612243931777|
|regA| 7851.067371046244|
|regA| 4412.135940077227|
|regF| 7768.088411993063|
|regC| 8487.318566397455|
|regB|3270.4437860414255|
|regB| 1737.156094854783|
|regB|1174.4495834500963|
|regF| 5854.771431302804|
|regA| 9999.170456582882|
|regF| 4149.643093481655|
|regF| 6851.296839248875|
|regE|8031.0974464980545|
|regA| 6865.791897195005|
|regE| 7788.098836535736|
|regD|   9546.5055944439|
|regA|2794.2461412627963|
|regA|1183.4201558829911|
|regA|2371.4411120631135|
|regB|2135.9887738678563|
+----+------------------+
only showing top 20 rows



In [52]:
spark.sql("select geo, val from train_df").show()

+----+------------------+
| geo|               val|
+----+------------------+
|regF| 3941.612243931777|
|regA| 7851.067371046244|
|regA| 4412.135940077227|
|regF| 7768.088411993063|
|regC| 8487.318566397455|
|regB|3270.4437860414255|
|regB| 1737.156094854783|
|regB|1174.4495834500963|
|regF| 5854.771431302804|
|regA| 9999.170456582882|
|regF| 4149.643093481655|
|regF| 6851.296839248875|
|regE|8031.0974464980545|
|regA| 6865.791897195005|
|regE| 7788.098836535736|
|regD|   9546.5055944439|
|regA|2794.2461412627963|
|regA|1183.4201558829911|
|regA|2371.4411120631135|
|regB|2135.9887738678563|
+----+------------------+
only showing top 20 rows



**Group by**

In [53]:
df.groupBy("prod").sum().show()

+-----+------------------+
| prod|          sum(val)|
+-----+------------------+
|prodE|200344.20973732683|
|prodM|208910.57198616146|
|prodL|180375.40537054365|
|prodI|195882.05155923506|
|prodB|118974.31060099883|
|prodG|204282.66023881637|
|prodC| 185378.1188498701|
|prodJ|153095.65225463244|
|prodA|165984.72898796143|
|prodD|180665.70024676438|
|prodF| 144171.5337693601|
|prodK|184605.15827039533|
|prodH|152223.36908939172|
+-----+------------------+



In [54]:
spark.sql("select prod, sum(val) from train_df group by prod").show()

+-----+------------------+
| prod|          sum(val)|
+-----+------------------+
|prodE|200344.20973732683|
|prodM|208910.57198616146|
|prodL|180375.40537054365|
|prodI|195882.05155923506|
|prodB|118974.31060099883|
|prodG|204282.66023881637|
|prodC| 185378.1188498701|
|prodJ|153095.65225463244|
|prodA|165984.72898796143|
|prodD|180665.70024676438|
|prodF| 144171.5337693601|
|prodK|184605.15827039533|
|prodH|152223.36908939172|
+-----+------------------+



In [55]:
df.groupBy(["prod","geo"]).sum().show()

+-----+----+------------------+
| prod| geo|          sum(val)|
+-----+----+------------------+
|prodI|regB| 26614.21261323439|
|prodA|regE|29147.939313541443|
|prodB|regF| 15182.29534355441|
|prodD|regE| 50465.05578580952|
|prodK|regB| 31934.40395342406|
|prodF|regD|30099.764020475784|
|prodH|regE| 9780.791256960383|
|prodB|regA|15173.427162975999|
|prodD|regF|14644.907916090564|
|prodG|regC|34617.725516208724|
|prodB|regE|30647.927342034312|
|prodK|regF|25307.428539388253|
|prodD|regD|32346.278487783064|
|prodM|regC| 7226.084397656817|
|prodL|regA|32890.519567608215|
|prodM|regD| 18403.31832397603|
|prodG|regF|  33858.2808189527|
|prodF|regE|1084.5768830938393|
|prodE|regF| 46132.38800533221|
|prodI|regC| 31181.94863954508|
+-----+----+------------------+
only showing top 20 rows



In [56]:
spark.sql("select prod, geo, sum(val) from train_df group by prod, geo").show()

+-----+----+------------------+
| prod| geo|          sum(val)|
+-----+----+------------------+
|prodI|regB| 26614.21261323439|
|prodA|regE|29147.939313541443|
|prodB|regF| 15182.29534355441|
|prodD|regE| 50465.05578580952|
|prodK|regB| 31934.40395342406|
|prodF|regD|30099.764020475784|
|prodH|regE| 9780.791256960383|
|prodB|regA|15173.427162975999|
|prodD|regF|14644.907916090564|
|prodG|regC|34617.725516208724|
|prodB|regE|30647.927342034312|
|prodK|regF|25307.428539388253|
|prodD|regD|32346.278487783064|
|prodM|regC| 7226.084397656817|
|prodL|regA|32890.519567608215|
|prodM|regD| 18403.31832397603|
|prodG|regF|  33858.2808189527|
|prodF|regE|1084.5768830938393|
|prodE|regF| 46132.38800533221|
|prodI|regC| 31181.94863954508|
+-----+----+------------------+
only showing top 20 rows



**Where (filter)**

In [57]:
df.where(df["prod"] != "prodA").show()

+----+-----+------------------+
| geo| prod|               val|
+----+-----+------------------+
|regF|prodB| 3941.612243931777|
|regA|prodD| 7851.067371046244|
|regA|prodF| 4412.135940077227|
|regF|prodE| 7768.088411993063|
|regC|prodC| 8487.318566397455|
|regB|prodH|3270.4437860414255|
|regB|prodI| 1737.156094854783|
|regB|prodL|1174.4495834500963|
|regF|prodE| 5854.771431302804|
|regA|prodJ| 9999.170456582882|
|regF|prodM| 4149.643093481655|
|regF|prodK| 6851.296839248875|
|regE|prodI|8031.0974464980545|
|regE|prodK| 7788.098836535736|
|regD|prodI|   9546.5055944439|
|regA|prodE|2794.2461412627963|
|regA|prodK|1183.4201558829911|
|regA|prodB|2371.4411120631135|
|regB|prodC|2135.9887738678563|
|regB|prodE| 7025.129601579196|
+----+-----+------------------+
only showing top 20 rows



In [None]:
spark.sql("select * from train_df where prod != 'prodA'").show()

In [58]:
# like
df.where(df.prod.like('%A')).show()

+----+-----+------------------+
| geo| prod|               val|
+----+-----+------------------+
|regA|prodA| 6865.791897195005|
|regC|prodA|7215.5297433968235|
|regA|prodA| 6403.704533351597|
|regA|prodA| 2089.495585605836|
|regA|prodA| 8458.365799022124|
|regE|prodA| 8273.792969260803|
|regC|prodA|  8476.57917001543|
|regD|prodA| 5182.376199429085|
|regB|prodA| 5417.392015707275|
|regA|prodA|2075.9926712954757|
|regD|prodA| 1904.734972817117|
|regB|prodA| 4964.219183224092|
|regB|prodA| 4606.366980267458|
|regF|prodA|  8741.49999243562|
|regE|prodA| 6589.245889649337|
|regD|prodA| 4888.257977446953|
|regA|prodA| 6918.943478348161|
|regA|prodA| 5883.342349214755|
|regD|prodA| 9930.314254556977|
|regD|prodA| 5961.385691924227|
+----+-----+------------------+
only showing top 20 rows



In [None]:
spark.sql("select * from train_df where prod like '%A'").show()

**Order by**

In [59]:
df.groupBy(["prod","geo"]).sum().orderBy("geo").show()

+-----+----+------------------+
| prod| geo|          sum(val)|
+-----+----+------------------+
|prodF|regA|16159.108540624577|
|prodA|regA| 60940.39816691554|
|prodI|regA| 4920.170437035049|
|prodK|regA|19950.005918466282|
|prodE|regA| 63855.75514005185|
|prodB|regA|15173.427162975999|
|prodJ|regA| 31533.65279126246|
|prodD|regA| 36200.10333060895|
|prodH|regA| 33347.42827483338|
|prodC|regA|27816.734886044535|
|prodL|regA|32890.519567608215|
|prodM|regA| 58332.29058567657|
|prodG|regA| 43771.03604185874|
|prodK|regB| 31934.40395342406|
|prodJ|regB|22852.827988291472|
|prodC|regB| 38666.67273025063|
|prodL|regB|21662.464276897255|
|prodM|regB| 61621.07292795643|
|prodG|regB|28958.581434979576|
|prodE|regB|27459.596285651718|
+-----+----+------------------+
only showing top 20 rows



In [None]:
spark.sql("select prod, geo, sum(val) from train_df group by prod, geo order by geo").show()

**Wiele agregacji + aliasy**

In [60]:
df.groupBy(["prod","geo"]).agg(f.sum("val").alias("val_sum"), f.avg("val").alias("val_avg")).show()

+-----+----+------------------+------------------+
| prod| geo|           val_sum|           val_avg|
+-----+----+------------------+------------------+
|prodI|regB| 26614.21261323439|3802.0303733191986|
|prodA|regE|29147.939313541443| 5829.587862708288|
|prodB|regF| 15182.29534355441|3795.5738358886024|
|prodD|regE| 50465.05578580952| 7209.293683687074|
|prodK|regB| 31934.40395342406|  5322.40065890401|
|prodF|regD|30099.764020475784| 6019.952804095157|
|prodH|regE| 9780.791256960383| 3260.263752320128|
|prodB|regA|15173.427162975999| 5057.809054325333|
|prodD|regF|14644.907916090564| 2928.981583218113|
|prodG|regC|34617.725516208724|4327.2156895260905|
|prodB|regE|30647.927342034312| 5107.987890339052|
|prodK|regF|25307.428539388253|  5061.48570787765|
|prodD|regD|32346.278487783064|  5391.04641463051|
|prodM|regC| 7226.084397656817| 7226.084397656817|
|prodL|regA|32890.519567608215| 6578.103913521643|
|prodM|regD| 18403.31832397603| 6134.439441325343|
|prodG|regF|  33858.2808189527|

In [None]:
spark.sql("select prod, geo, sum(val) as val_sum, avg(val) as val_avg from train_df group by prod, geo").show()

**Joiny**

In [61]:
# dostępne: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, i left_anti
df.join(geo_df, df.geo == geo_df.geo_id, "inner").show()

+----+-----+------------------+------+--------+
| geo| prod|               val|geo_id|geo_name|
+----+-----+------------------+------+--------+
|regB|prodH|3270.4437860414255|  regB|Region B|
|regB|prodI| 1737.156094854783|  regB|Region B|
|regB|prodL|1174.4495834500963|  regB|Region B|
|regB|prodC|2135.9887738678563|  regB|Region B|
|regB|prodE| 7025.129601579196|  regB|Region B|
|regB|prodC|2510.1022903102203|  regB|Region B|
|regB|prodF|1972.8887469184592|  regB|Region B|
|regB|prodI| 4569.889973649646|  regB|Region B|
|regB|prodK|1917.1117490137888|  regB|Region B|
|regB|prodG| 7952.327964561016|  regB|Region B|
|regB|prodF| 5843.556158034302|  regB|Region B|
|regB|prodC| 8665.418440173988|  regB|Region B|
|regB|prodF| 8934.680656024948|  regB|Region B|
|regB|prodK| 2636.558714419226|  regB|Region B|
|regB|prodM| 4063.049588478553|  regB|Region B|
|regB|prodM| 7823.481881342198|  regB|Region B|
|regB|prodA| 5417.392015707275|  regB|Region B|
|regB|prodC| 8136.395813190857|  regB|Re

In [63]:
df.join(geo_df, df.geo == geo_df.geo_id, "inner").count()

332

In [None]:
q = """select train_df.prod, train_df.geo, train_df.val, geo_df.geo_name \
from train_df inner join geo_df on train_df.geo == geo_df.geo_id"""
spark.sql(q).show()

In [64]:
df.join(geo_df, df.geo == geo_df.geo_id, "outer").show()

+----+-----+------------------+------+--------+
| geo| prod|               val|geo_id|geo_name|
+----+-----+------------------+------+--------+
|regB|prodH|3270.4437860414255|  regB|Region B|
|regB|prodI| 1737.156094854783|  regB|Region B|
|regB|prodL|1174.4495834500963|  regB|Region B|
|regB|prodC|2135.9887738678563|  regB|Region B|
|regB|prodE| 7025.129601579196|  regB|Region B|
|regB|prodC|2510.1022903102203|  regB|Region B|
|regB|prodF|1972.8887469184592|  regB|Region B|
|regB|prodI| 4569.889973649646|  regB|Region B|
|regB|prodK|1917.1117490137888|  regB|Region B|
|regB|prodG| 7952.327964561016|  regB|Region B|
|regB|prodF| 5843.556158034302|  regB|Region B|
|regB|prodC| 8665.418440173988|  regB|Region B|
|regB|prodF| 8934.680656024948|  regB|Region B|
|regB|prodK| 2636.558714419226|  regB|Region B|
|regB|prodM| 4063.049588478553|  regB|Region B|
|regB|prodM| 7823.481881342198|  regB|Region B|
|regB|prodA| 5417.392015707275|  regB|Region B|
|regB|prodC| 8136.395813190857|  regB|Re

In [65]:
df.join(geo_df, df.geo == geo_df.geo_id, "outer").count()

400

In [None]:
q = """select train_df.prod, train_df.geo, train_df.val, geo_df.geo_name \
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id"""
spark.sql(q).show()

In [None]:
q = """select train_df.prod, train_df.geo, train_df.val, geo_df.geo_name \
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id"""
spark.sql(q).count()

**Distinct**

In [66]:
df.select(["prod", "geo"]).distinct().show()

+-----+----+
| prod| geo|
+-----+----+
|prodI|regB|
|prodA|regE|
|prodB|regF|
|prodD|regE|
|prodK|regB|
|prodF|regD|
|prodH|regE|
|prodB|regA|
|prodD|regF|
|prodG|regC|
|prodB|regE|
|prodK|regF|
|prodD|regD|
|prodM|regC|
|prodL|regA|
|prodM|regD|
|prodG|regF|
|prodF|regE|
|prodE|regF|
|prodI|regC|
+-----+----+
only showing top 20 rows



In [None]:
spark.sql("select distinct prod, geo from train_df").show()

In [67]:
# alternatywnie
df.select(["prod", "geo"]).dropDuplicates().show()

+-----+----+
| prod| geo|
+-----+----+
|prodI|regB|
|prodA|regE|
|prodB|regF|
|prodD|regE|
|prodK|regB|
|prodF|regD|
|prodH|regE|
|prodB|regA|
|prodD|regF|
|prodG|regC|
|prodB|regE|
|prodK|regF|
|prodD|regD|
|prodM|regC|
|prodL|regA|
|prodM|regD|
|prodG|regF|
|prodF|regE|
|prodE|regF|
|prodI|regC|
+-----+----+
only showing top 20 rows



**Usuwanie NULLi**

In [68]:
# "any" or "all"
df.join(geo_df, df.geo == geo_df.geo_id, "outer").dropna("any").show()

+----+-----+------------------+------+--------+
| geo| prod|               val|geo_id|geo_name|
+----+-----+------------------+------+--------+
|regB|prodH|3270.4437860414255|  regB|Region B|
|regB|prodI| 1737.156094854783|  regB|Region B|
|regB|prodL|1174.4495834500963|  regB|Region B|
|regB|prodC|2135.9887738678563|  regB|Region B|
|regB|prodE| 7025.129601579196|  regB|Region B|
|regB|prodC|2510.1022903102203|  regB|Region B|
|regB|prodF|1972.8887469184592|  regB|Region B|
|regB|prodI| 4569.889973649646|  regB|Region B|
|regB|prodK|1917.1117490137888|  regB|Region B|
|regB|prodG| 7952.327964561016|  regB|Region B|
|regB|prodF| 5843.556158034302|  regB|Region B|
|regB|prodC| 8665.418440173988|  regB|Region B|
|regB|prodF| 8934.680656024948|  regB|Region B|
|regB|prodK| 2636.558714419226|  regB|Region B|
|regB|prodM| 4063.049588478553|  regB|Region B|
|regB|prodM| 7823.481881342198|  regB|Region B|
|regB|prodA| 5417.392015707275|  regB|Region B|
|regB|prodC| 8136.395813190857|  regB|Re

In [69]:
df.join(geo_df, df.geo == geo_df.geo_id, "outer").dropna("any").count()

332

In [None]:
q = """select * \
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id \
where geo_name is not null"""
spark.sql(q).show()

In [None]:
q = """select * \
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id \
where geo_name is not null"""
spark.sql(q).count()

**Zastępowanie NULLi**

In [70]:
df.join(geo_df, df.geo == geo_df.geo_id, "outer").fillna("replacement").where("geo_name == 'replacement'").show()

+----+-----+------------------+-----------+-----------+
| geo| prod|               val|     geo_id|   geo_name|
+----+-----+------------------+-----------+-----------+
|regF|prodB| 3941.612243931777|replacement|replacement|
|regF|prodE| 7768.088411993063|replacement|replacement|
|regF|prodE| 5854.771431302804|replacement|replacement|
|regF|prodM| 4149.643093481655|replacement|replacement|
|regF|prodK| 6851.296839248875|replacement|replacement|
|regF|prodE| 6465.227445018385|replacement|replacement|
|regF|prodE| 7731.330867597572|replacement|replacement|
|regF|prodJ| 2030.795842410079|replacement|replacement|
|regF|prodH| 7435.132345445216|replacement|replacement|
|regF|prodH| 8646.180642996196|replacement|replacement|
|regF|prodL| 7604.157736120527|replacement|replacement|
|regF|prodJ| 5351.477534948765|replacement|replacement|
|regF|prodG| 3415.559842443028|replacement|replacement|
|regF|prodK|1258.2999799308063|replacement|replacement|
|regF|prodI| 8922.879817824029|replacement|repla

In [None]:
q = """select prod, geo, val, if(geo_name is null, 'replacement', geo_name) as geo_name \
from train_df full outer join geo_df on train_df.geo == geo_df.geo_id \
order by geo_name desc"""
spark.sql(q).show()

**Podmiana wartości**

In [71]:
df.replace("prodA", "Product A").show()

+----+---------+------------------+
| geo|     prod|               val|
+----+---------+------------------+
|regF|    prodB| 3941.612243931777|
|regA|    prodD| 7851.067371046244|
|regA|    prodF| 4412.135940077227|
|regF|    prodE| 7768.088411993063|
|regC|    prodC| 8487.318566397455|
|regB|    prodH|3270.4437860414255|
|regB|    prodI| 1737.156094854783|
|regB|    prodL|1174.4495834500963|
|regF|    prodE| 5854.771431302804|
|regA|    prodJ| 9999.170456582882|
|regF|    prodM| 4149.643093481655|
|regF|    prodK| 6851.296839248875|
|regE|    prodI|8031.0974464980545|
|regA|Product A| 6865.791897195005|
|regE|    prodK| 7788.098836535736|
|regD|    prodI|   9546.5055944439|
|regA|    prodE|2794.2461412627963|
|regA|    prodK|1183.4201558829911|
|regA|    prodB|2371.4411120631135|
|regB|    prodC|2135.9887738678563|
+----+---------+------------------+
only showing top 20 rows



In [72]:
df.show()

+----+-----+------------------+
| geo| prod|               val|
+----+-----+------------------+
|regF|prodB| 3941.612243931777|
|regA|prodD| 7851.067371046244|
|regA|prodF| 4412.135940077227|
|regF|prodE| 7768.088411993063|
|regC|prodC| 8487.318566397455|
|regB|prodH|3270.4437860414255|
|regB|prodI| 1737.156094854783|
|regB|prodL|1174.4495834500963|
|regF|prodE| 5854.771431302804|
|regA|prodJ| 9999.170456582882|
|regF|prodM| 4149.643093481655|
|regF|prodK| 6851.296839248875|
|regE|prodI|8031.0974464980545|
|regA|prodA| 6865.791897195005|
|regE|prodK| 7788.098836535736|
|regD|prodI|   9546.5055944439|
|regA|prodE|2794.2461412627963|
|regA|prodK|1183.4201558829911|
|regA|prodB|2371.4411120631135|
|regB|prodC|2135.9887738678563|
+----+-----+------------------+
only showing top 20 rows



In [None]:
spark.sql("select geo, regexp_replace(prod, 'prodA', 'Product A') as prod, val from train_df").show()

**Zmiana nazw kolumn**

In [73]:
df.withColumnRenamed("val", "volume").show()

+----+-----+------------------+
| geo| prod|            volume|
+----+-----+------------------+
|regF|prodB| 3941.612243931777|
|regA|prodD| 7851.067371046244|
|regA|prodF| 4412.135940077227|
|regF|prodE| 7768.088411993063|
|regC|prodC| 8487.318566397455|
|regB|prodH|3270.4437860414255|
|regB|prodI| 1737.156094854783|
|regB|prodL|1174.4495834500963|
|regF|prodE| 5854.771431302804|
|regA|prodJ| 9999.170456582882|
|regF|prodM| 4149.643093481655|
|regF|prodK| 6851.296839248875|
|regE|prodI|8031.0974464980545|
|regA|prodA| 6865.791897195005|
|regE|prodK| 7788.098836535736|
|regD|prodI|   9546.5055944439|
|regA|prodE|2794.2461412627963|
|regA|prodK|1183.4201558829911|
|regA|prodB|2371.4411120631135|
|regB|prodC|2135.9887738678563|
+----+-----+------------------+
only showing top 20 rows



In [None]:
spark.sql("select prod, geo, val as volume from train_df").show()

**Tworzenie nowej kolumny**

In [75]:
df.withColumn("val2", df["val"] / 100).show()

+----+-----+------------------+------------------+
| geo| prod|               val|              val2|
+----+-----+------------------+------------------+
|regF|prodB| 3941.612243931777| 39.41612243931777|
|regA|prodD| 7851.067371046244| 78.51067371046244|
|regA|prodF| 4412.135940077227| 44.12135940077227|
|regF|prodE| 7768.088411993063| 77.68088411993064|
|regC|prodC| 8487.318566397455| 84.87318566397454|
|regB|prodH|3270.4437860414255| 32.70443786041425|
|regB|prodI| 1737.156094854783| 17.37156094854783|
|regB|prodL|1174.4495834500963|11.744495834500963|
|regF|prodE| 5854.771431302804| 58.54771431302804|
|regA|prodJ| 9999.170456582882| 99.99170456582883|
|regF|prodM| 4149.643093481655| 41.49643093481655|
|regF|prodK| 6851.296839248875| 68.51296839248874|
|regE|prodI|8031.0974464980545| 80.31097446498055|
|regA|prodA| 6865.791897195005| 68.65791897195005|
|regE|prodK| 7788.098836535736| 77.88098836535737|
|regD|prodI|   9546.5055944439| 95.46505594443899|
|regA|prodE|2794.2461412627963|

In [None]:
spark.sql("select prod, geo, val, val/100 as val2 from train_df").show()

**When**

In [76]:
df.select(df.prod, f.when(df.val > 7500, 1).when(df.val < 2500, 3).otherwise(2).alias("out")).show()

+-----+---+
| prod|out|
+-----+---+
|prodB|  2|
|prodD|  1|
|prodF|  2|
|prodE|  1|
|prodC|  1|
|prodH|  2|
|prodI|  3|
|prodL|  3|
|prodE|  2|
|prodJ|  1|
|prodM|  2|
|prodK|  2|
|prodI|  1|
|prodA|  2|
|prodK|  1|
|prodI|  1|
|prodE|  2|
|prodK|  3|
|prodB|  3|
|prodC|  3|
+-----+---+
only showing top 20 rows



In [None]:
q = """select prod, case when val > 7500 then 1 \
when val <= 7500 and val >= 2500 then 2 \
when val < 2500 then 3 end as out from train_df"""
spark.sql(q).show()

**Substring**

In [77]:
df.select(df.prod.substr(2, 4).alias("out")).show()

+----+
| out|
+----+
|rodB|
|rodD|
|rodF|
|rodE|
|rodC|
|rodH|
|rodI|
|rodL|
|rodE|
|rodJ|
|rodM|
|rodK|
|rodI|
|rodA|
|rodK|
|rodI|
|rodE|
|rodK|
|rodB|
|rodC|
+----+
only showing top 20 rows



In [290]:
df.select(df.prod).show()

AttributeError: 'DataFrame' object has no attribute 'lower'

In [None]:
spark.sql("select substring(prod, 2, 4) as val2 from train_df").show()

**Funkcje analityczne (window functions)**

In [37]:
from pyspark.sql.window import Window

In [79]:
windowSpec = Window.partitionBy('prod')# jakgroupby tylko zwraca wartości niepogrupowane

df.select("prod", "val", f.sum("val").over(windowSpec).alias("prod_val")).show()

+-----+------------------+------------------+
| prod|               val|          prod_val|
+-----+------------------+------------------+
|prodE| 7768.088411993063|200344.20973732683|
|prodE| 5854.771431302804|200344.20973732683|
|prodE|2794.2461412627963|200344.20973732683|
|prodE| 7025.129601579196|200344.20973732683|
|prodE| 6465.227445018385|200344.20973732683|
|prodE| 7731.330867597572|200344.20973732683|
|prodE| 9623.555543853063|200344.20973732683|
|prodE|1683.9683005875395|200344.20973732683|
|prodE| 8652.639151799167|200344.20973732683|
|prodE| 4335.137885962506|200344.20973732683|
|prodE|1779.9660825010696|200344.20973732683|
|prodE| 3977.121674763371|200344.20973732683|
|prodE| 7521.265009837464|200344.20973732683|
|prodE|  5807.21580340897|200344.20973732683|
|prodE| 5980.230701187475|200344.20973732683|
|prodE|1462.3904546522006|200344.20973732683|
|prodE| 8399.040807694873|200344.20973732683|
|prodE| 4984.489036173954|200344.20973732683|
|prodE| 8090.452239820582|200344.2

In [None]:
spark.sql("select prod, val, sum(val) over (partition by prod) as prod_val from train_df").show()

In [80]:
spark.stop()

### Ćwiczenia

**Importy i przygotowanie danych**

In [81]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as f
from pyspark.sql.window import Window
import random

In [82]:
spark = SparkSession.builder.appName('my_app').master("local[*]").getOrCreate()

In [83]:
goBike = spark.read.csv("./2017-fordgobike-tripdata.csv", header=True, inferSchema=True)

In [84]:
goBike.printSchema()

root
 |-- duration_sec: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: double (nullable = true)
 |-- start_station_longitude: double (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: double (nullable = true)
 |-- end_station_longitude: double (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: integer (nullable = true)
 |-- member_gender: string (nullable = true)



In [90]:
goBike.count()

519700

In [91]:
len(goBike.columns)

15

> **ZADANIE 1**: Zapoznaj się z danymi:
1. Pozbądź się wierszy zawierających NULLe
1. Sprawdź rozkład zmiennej "member_gender"
2. Oblicz minimalny, maksymalny i średni wiek osób wypożyczających rowery
3. Oblicz liczbę unikalnych rowerów
4. Oblicz liczbę unikalnych stacji
5. Sprawdź który rower był wypożyczony najdłużej a który najkrócej w ciągu analizowanego okresu (oraz jak długo)
6. Oblicz średni czas pojedynczego wypożyczenia
7. Sprawdź pomiędzy którymi stacjami występował największy ruch (hint: A -> B == B -> A)
8. Sprawdź o której godzinie w ciągu dnia wypożyczano najwięcej rowerów
9. Sprawdź *średnią liczbę wypożyczeń* dla poszczególnych dni tygodnia (hint: java.text.SimpleDateFormat)
10. **⋆** Oblicz średni dystans (w km) pomiędzy stacją początkową a końcową dla wszystkich wypożyczeń

### 1

In [93]:
goBike = goBike.dropna()

In [94]:
goBike.count()

453159

### 2

In [101]:
goBike.groupBy('member_gender').count().show()

+-------------+------+
|member_gender| count|
+-------------+------+
|       Female| 98542|
|        Other|  6299|
|         Male|348318|
+-------------+------+



In [98]:
goBike.select('member_gender').distinct().show()

+-------------+
|member_gender|
+-------------+
|       Female|
|        Other|
|         Male|
+-------------+



### 3

In [106]:
goBike = goBike.withColumn('age',2018 - goBike["member_birth_year"])

In [107]:
goBike.columns

['duration_sec',
 'start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'member_birth_year',
 'member_gender',
 'age']

In [108]:
goBike.select('age').describe().show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|            453159|
|   mean|37.595212717831934|
| stddev|10.513487539908406|
|    min|                19|
|    max|               132|
+-------+------------------+



### lub

In [117]:
goBike.select((2018 - goBike["member_birth_year"]).alias('age'))\
.agg(f.min('age').alias('min_age'),f.max('age').alias('max_age'),f.avg('age').alias('avg_age')).show()

+-------+-------+------------------+
|min_age|max_age|           avg_age|
+-------+-------+------------------+
|     19|    132|37.595212717831934|
+-------+-------+------------------+



### 4

In [122]:
goBike.select('bike_id').distinct().count()

3670

### 5

In [128]:
goBike.select('start_station_id')\
.union(goBike.select('end_station_id')).dropDuplicates().count()

272

### 6

In [137]:
goBike.agg(f.max('duration_sec'))

DataFrame[max(duration_sec): int]

In [160]:
goBike.groupBy('bike_id').agg(f.sum('duration_sec').alias('sum_sec'))\
.orderBy('sum_sec',ascending=False).take(1)

[Row(bike_id=2178, sum_sec=377954)]

In [159]:
goBike.groupBy('bike_id').agg(f.sum('duration_sec').alias('sum_sec'))\
.orderBy('sum_sec',ascending=True).take(1)

[Row(bike_id=2609, sum_sec=74)]

### lub

In [161]:
values = goBike.groupBy('bike_id').agg(f.sum('duration_sec').alias('sum_sec'))\
.orderBy('sum_sec',ascending=True).collect()

In [164]:
values[0]

Row(bike_id=2609, sum_sec=74)

In [166]:
values[-1]

Row(bike_id=2178, sum_sec=377954)

In [168]:
values[-1].sum_sec

377954

### 7

In [185]:
goBike.agg(f.round(f.avg('duration_sec'),1)).show()

+---------------------------+
|round(avg(duration_sec), 1)|
+---------------------------+
|                      832.9|
+---------------------------+



### 8

In [175]:
goBike.groupBy('start_station_id','end_station_id').count().show()

+----------------+--------------+-----+
|start_station_id|end_station_id|count|
+----------------+--------------+-----+
|             259|           259|   16|
|             193|           196|   74|
|              15|            14|   70|
|              28|            16|  224|
|             181|           198|   53|
|             237|           227|    7|
|             108|           112|   92|
|             163|             7|   58|
|              15|            26|   46|
|             133|           323|    2|
|              61|            13|   10|
|             195|           179|   27|
|             114|           223|   43|
|               3|            22|  158|
|              75|            97|   34|
|              58|            81|   93|
|             295|           295|   11|
|              63|            81|   20|
|             127|           109|   75|
|              43|           100|   13|
+----------------+--------------+-----+
only showing top 20 rows



In [182]:
start_end = goBike.where(goBike['start_station_id'] <= goBike['end_station_id'])

In [183]:
end_start= goBike.where(goBike['start_station_id'] > goBike['end_station_id'])

In [188]:
end_start.select('end_station_id','start_station_id')

DataFrame[end_station_id: int, start_station_id: int]

In [199]:
start_end.select('start_station_id','end_station_id').union(end_start.select('end_station_id','start_station_id'))\
.groupBy('start_station_id','end_station_id').count().orderBy('count',ascending=False).take(1)

[Row(start_station_id=6, end_station_id=15, count=3282)]

### 9

In [209]:
count_hour = goBike.select(f.hour('start_time').alias('hour')).groupBy('hour').count()

In [210]:
count_hour.columns

['hour', 'count']

In [213]:
count_hour.orderBy('count',ascending=False).take(1)

[Row(hour=8, count=54678)]

### 10

In [224]:
day_week = goBike.select(f.dayofweek('start_time').alias('day'),f.weekofyear('start_time').alias('week'))\
.groupBy('day','week').count()

In [225]:
day_week.show()

+---+----+-----+
|day|week|count|
+---+----+-----+
|  5|  49| 4066|
|  5|  40| 3847|
|  3|  30| 1845|
|  6|  37| 3273|
|  7|  33| 1289|
|  5|  33| 2831|
|  1|  37| 1795|
|  2|  45| 3860|
|  2|  40| 3752|
|  4|  47| 2588|
|  7|  37| 1811|
|  1|  46| 1477|
|  7|  31| 1050|
|  3|  43| 4070|
|  6|  42| 3139|
|  3|  41| 3653|
|  2|  49| 3821|
|  1|  32| 1177|
|  1|  49| 1614|
|  4|  27|  974|
+---+----+-----+
only showing top 20 rows



In [227]:
day_week.groupBy('day').agg(f.avg('count')).show()

+---+------------------+
|day|        avg(count)|
+---+------------------+
|  1|1214.3703703703704|
|  6|2667.3333333333335|
|  3|3095.4615384615386|
|  5| 2859.185185185185|
|  4| 2976.185185185185|
|  7|1362.2222222222222|
|  2|2828.3076923076924|
+---+------------------+



### 11

In [228]:
goBike.columns

['duration_sec',
 'start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'member_birth_year',
 'member_gender',
 'age']

In [231]:
goBike.select('end_station_latitude').describe().show()

+-------+--------------------+
|summary|end_station_latitude|
+-------+--------------------+
|  count|              453159|
|   mean|   37.77209035033914|
| stddev| 0.08481063344027019|
|    min|          37.3172979|
|    max|   37.88022244590679|
+-------+--------------------+



In [None]:
goBike.withColumn('distance', 2 * 6378.14)

In [None]:
df.withColumn("val2", df["val"] / 100).show()

In [None]:

defdef  haversinehaversin (pt1,pt2):
    '''
    The haversine formula determines the great-circle distance 
    between two points on a sphere given their longitudes and latitudes. 
    '''
    r = 6378.14
    (lat1,lon1) = np.deg2rad(pt1)
    (lat2,lon2) = np.deg2rad(pt2)
    
    sin_lat = np.sin((lat2-lat1)/2)
    sin_lon = np.sin((lon2-lon1)/2)
    
    return 2 * r * np.arcsin(np.sqrt(sin_lat * sin_lat + np.cos(lat1) * np.cos(lat2) * sin_lon * sin_lon))

> **ZADANIE 2**: Utwórz DataFrame `dataDaily` zawierający dane zagregowane do poziomu dnia. Zbiór ma zawierać następujące informacje (kolumny): 
- 'date' : data 
- 'avg_duration_sec' : średni czas wypożyczeń danego dnia
- 'n_trips' : liczba wypożyczeń danego dnia
- 'n_bikes' : liczba unikatowych rowerów użytych danego dnia
- 'n_routes' : liczba unikatowych kombinacji stacji (x -> y == y -> x) danego dnia
- 'n_subscriber' : liczba wypożyczeń dokonanych przez subskrybentów danego dnia

In [237]:
goBike = goBike.withColumn('date',f.date_format('start_time','dd.MM.yy'))

In [238]:
goBike.columns

['duration_sec',
 'start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'member_birth_year',
 'member_gender',
 'age',
 'date']

In [277]:
dataDaily = goBike.groupBy('date').agg(f.avg('duration_sec').alias('avg_duration_sec'),
                                       f.count('*').alias('n_trips'),f.countDistinct('bike_id').alias('n_bikes'),
                                       f.sum(f.when(goBike.user_type == 'Subscriber',1).otherwise(0)).alias('N_subscriber'))

In [278]:
dataDaily.columns

['date', 'avg_duration_sec', 'n_trips', 'n_bikes', 'N_subscriber']

In [279]:
start_end = goBike.where(goBike['start_station_id'] <= goBike['end_station_id'])

In [280]:
end_start= goBike.where(goBike['start_station_id'] > goBike['end_station_id'])

In [281]:
end_start.select('end_station_id','start_station_id')

DataFrame[end_station_id: int, start_station_id: int]

In [282]:
unique_routes = start_end.select('start_station_id','end_station_id','date').union(end_start.select('end_station_id','start_station_id','date'))\
    .dropDuplicates().groupBy('date').agg(f.count('*').alias('n_routes'))

In [283]:
unique_routes.show()

+--------+--------+
|    date|n_routes|
+--------+--------+
|21.12.17|    1540|
|17.09.17|    1062|
|04.08.17|    1000|
|05.08.17|     635|
|15.12.17|    1886|
|17.12.17|     994|
|07.07.17|     361|
|03.12.17|    1051|
|09.09.17|    1066|
|13.12.17|    1963|
|06.08.17|     583|
|17.11.17|    1904|
|20.09.17|    1740|
|13.09.17|    1680|
|27.10.17|    1961|
|08.10.17|     958|
|12.10.17|    1702|
|12.12.17|    1967|
|01.10.17|    1037|
|08.12.17|    1892|
+--------+--------+
only showing top 20 rows



In [285]:
dataDaily = dataDaily.join(unique_routes,'date')
dataDaily.show()

+--------+------------------+-------+-------+------------+--------+
|    date|  avg_duration_sec|n_trips|n_bikes|N_subscriber|n_routes|
+--------+------------------+-------+-------+------------+--------+
|04.08.17| 860.2129807692307|   2080|    882|        1776|    1000|
|17.09.17|1275.0027855153203|   1795|    953|        1226|    1062|
|21.12.17| 669.1406695156695|   2808|   1284|        2671|    1540|
|05.08.17|1264.4980952380952|   1050|    606|         748|     635|
|15.12.17| 763.8888888888889|   3663|   1431|        3396|    1886|
|07.07.17| 873.8014042126379|    997|    392|         932|     361|
|17.12.17|1075.1111869031379|   1466|    876|        1275|     994|
|03.12.17| 874.0664183736809|   1611|    923|        1375|    1051|
|09.09.17|1329.9659284497445|   1761|    915|        1283|    1066|
|13.12.17| 659.8088491175739|   4023|   1473|        3788|    1963|
|06.08.17| 1210.444562899787|    938|    564|         667|     583|
|13.09.17| 717.3768978862756|   3359|   1278|   

**Przygotowanie danych**

In [62]:
random.seed(42)
geo_id = [random.choice(["regA","regB","regC","regD","regE","regF"]) for x in range(400)]
prod_id = [random.choice(["prodA","prodB","prodC","prodD","prodE","prodF",
                          "prodG","prodH","prodI","prodJ","prodK","prodL","prodM"]) for x in range(400)]
date = [random.choice(["2015-","2016-","2017-"]) + 
        random.choice(["01-","02-","03-","04-","05-","06-","07-","08-","09-","10-","11-","12-"]) + "01"
        for x in range(400)]
value = [random.uniform(10000,100000) for x in range(400)]
volume = [random.uniform(1000,10000) for x in range(400)]

In [63]:
df = spark.createDataFrame([Row(prod=p, geo=g, val=v, vol=vl, dt=d) 
                            for p,g,v,vl,d in zip(prod_id, geo_id, value, volume, date)])

In [64]:
df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- prod: string (nullable = true)
 |-- val: double (nullable = true)
 |-- vol: double (nullable = true)



In [65]:
df = df.withColumn("dt", f.to_date(df["dt"]))

In [66]:
df.printSchema()

root
 |-- dt: date (nullable = true)
 |-- geo: string (nullable = true)
 |-- prod: string (nullable = true)
 |-- val: double (nullable = true)
 |-- vol: double (nullable = true)



In [23]:
df.createOrReplaceTempView("df")

In [24]:
df.show()

+----------+----+-----+------------------+------------------+
|        dt| geo| prod|               val|               vol|
+----------+----+-----+------------------+------------------+
|2016-06-01|regF|prodB| 95087.40786730956| 5119.116197026179|
|2016-05-01|regA|prodD|13902.853320925453| 3640.983140575281|
|2016-05-01|regA|prodF| 80490.42963823416| 1394.256480932108|
|2015-08-01|regF|prodE| 88028.28169838544| 2795.228503436695|
|2015-12-01|regC|prodC| 56930.60932417757| 1377.153477373438|
|2017-01-01|regB|prodH|51223.826988791385| 9400.338819553575|
|2016-04-01|regB|prodI| 96762.35648098258| 5638.452303290489|
|2017-02-01|regB|prodL|15474.286674505525|  9902.10432066511|
|2017-01-01|regF|prodE|  53108.3719898527| 5887.276278887673|
|2015-04-01|regA|prodJ| 46145.55290613044|3279.8238868235567|
|2015-01-01|regF|prodM| 71748.77464560093| 7779.618269369784|
|2017-03-01|regF|prodK|54124.196872980276| 2719.930876605198|
|2015-03-01|regE|prodI| 91873.07462037064| 4212.767584318271|
|2016-11

> **ZADANIE 3:**
1. Oblicz (globalną) średnią cenę produktów
2. Oblicz wartość całkowitą dla regionów per miesiąc
3. Oblicz udział wolumenu kombinacji region-produkt w całkowitym wolumenie produktu (jaka część całkowitego wolumenu danego produktu generowana jest w danym regionie)
5. Stwórz kolumnę `flag` zwierającą wartość `True` gdy nazwy regionu i produktu kończą się na tą samą literę - wartość `False` w każdym innym przypadku (nadpisz df aby zawierał nową kolumnę)
6. Oblicz iloczyn wartości i wolumenu gdy kolumna `flag` ma wartość `True`, w przeciwnym przypadku zwróć wartość 0 
7. Stwórz kolumnę z rokiem wyciągniętym z daty

In [31]:
#1
df_sum = df.groupBy('prod').agg(f.sum('val').alias('sum_val'),f.sum('vol').alias('sum_vol'))
df_sum.withColumn('avg_val',df_sum['sum_val']/df_sum['sum_vol']).show()

+-----+------------------+------------------+------------------+
| prod|           sum_val|           sum_vol|           avg_val|
+-----+------------------+------------------+------------------+
|prodE|1951480.6743014117| 184158.5712438623|10.596740956017008|
|prodM|1823820.0376677099|202986.63483985714| 8.984926712571898|
|prodL|  1797693.02778563|184257.72584209952| 9.756405163310063|
|prodI| 1787783.648304336|174413.81151260604|10.250241267017556|
|prodB|1371597.8375560446| 157216.1869349042|  8.72427874188272|
|prodG| 2257371.517211618| 262849.8421537546| 8.588064952655225|
|prodC| 1722946.964618383|163359.19703885887|10.546984778631955|
|prodJ|1248958.8517933392|148945.79296129555| 8.385324801472498|
|prodA|1550799.9084993473|156561.38992964628| 9.905379028611254|
|prodD|1509002.4397189445|178922.07052249167| 8.433852991485772|
|prodF| 1439529.328777088|136084.36978225425|10.578212112680163|
|prodK|1711504.8470177962|183268.55596059817| 9.338780665602894|
|prodH| 1519910.340914309

In [33]:
#2
df.groupBy('geo','dt').sum('val').orderBy('geo','dt').show()

+----+----------+------------------+
| geo|        dt|          sum(val)|
+----+----------+------------------+
|regA|2015-01-01| 76166.82863310802|
|regA|2015-02-01|155261.15709983977|
|regA|2015-03-01|163084.09896991577|
|regA|2015-04-01| 97789.60220958495|
|regA|2015-05-01|  148582.374710774|
|regA|2015-06-01|123526.96146645347|
|regA|2015-07-01|121993.10654308298|
|regA|2015-08-01|  99556.6117479058|
|regA|2015-09-01| 91570.84274473095|
|regA|2015-10-01|210166.80806373555|
|regA|2015-11-01| 57471.38096153344|
|regA|2015-12-01|111841.62910877481|
|regA|2016-02-01| 50814.90126167026|
|regA|2016-03-01|113431.95468646797|
|regA|2016-04-01| 39926.67475347864|
|regA|2016-05-01|247362.35939429235|
|regA|2016-06-01|196490.67133782775|
|regA|2016-07-01| 76491.63986398441|
|regA|2016-08-01|100456.22453586152|
|regA|2016-09-01|111773.61490563425|
+----+----------+------------------+
only showing top 20 rows



In [38]:
#3
windowSpec = Window.partitionBy('prod')

step1 = df.groupBy(['prod','geo']).agg(f.sum('vol').alias('prod_geo_vol'))\
.select('prod','geo','prod_geo_vol',f.sum('prod_geo_vol').over(windowSpec).alias('prod_vol'))

In [39]:
step1.show()

+-----+----+------------------+------------------+
| prod| geo|      prod_geo_vol|          prod_vol|
+-----+----+------------------+------------------+
|prodE|regF|45626.348915020906|184158.57124386227|
|prodE|regB| 29266.72538672742|184158.57124386227|
|prodE|regD|16189.072187292655|184158.57124386227|
|prodE|regC| 9562.599738735049|184158.57124386227|
|prodE|regA|56972.673496617695|184158.57124386227|
|prodE|regE| 26541.15151946855|184158.57124386227|
|prodM|regC| 7406.281936064914|202986.63483985714|
|prodM|regD| 16226.56288030393|202986.63483985714|
|prodM|regA| 60681.85156695515|202986.63483985714|
|prodM|regB|63458.168881366684|202986.63483985714|
|prodM|regE|10466.021919063438|202986.63483985714|
|prodM|regF| 44747.74765610305|202986.63483985714|
|prodL|regA|18254.985976197902|184257.72584209955|
|prodL|regE| 19839.61645806826|184257.72584209955|
|prodL|regD|53726.753410583726|184257.72584209955|
|prodL|regF| 35923.04327550772|184257.72584209955|
|prodL|regC|27077.076693677303|

In [41]:
step1.withColumn('ratio',step1['prod_geo_vol'] / step1['prod_vol']).orderBy('prod','geo').show()

+-----+----+------------------+------------------+-------------------+
| prod| geo|      prod_geo_vol|          prod_vol|              ratio|
+-----+----+------------------+------------------+-------------------+
|prodA|regA|  57461.9397984923|156561.38992964628| 0.3670249722764589|
|prodA|regB| 8743.846246073634|156561.38992964628|0.05584931412529514|
|prodA|regC| 14053.92990636366|156561.38992964628|0.08976625662737825|
|prodA|regD| 37202.53450475852|156561.38992964628|0.23762266368148724|
|prodA|regE| 33270.82691436357|156561.38992964628|0.21250978245220245|
|prodA|regF| 5828.312559594595|156561.38992964628|0.03722701083717801|
|prodB|regA| 15352.48052298814| 157216.1869349042|0.09765203457926937|
|prodB|regB| 19723.26114147876| 157216.1869349042|0.12545311984729177|
|prodB|regC| 35909.90666722029| 157216.1869349042|0.22841100122908392|
|prodB|regD|16869.660292353863| 157216.1869349042|0.10730231168460276|
|prodB|regE| 45142.00400298924| 157216.1869349042| 0.2871333091272619|
|prodB

In [68]:
#4
df = df.withColumn('flag',f.when(df.prod.substr(5,1) == df.geo.substr(4,1),'true').otherwise('false'))

In [69]:
df.show()

+----------+----+-----+------------------+------------------+-----+
|        dt| geo| prod|               val|               vol| flag|
+----------+----+-----+------------------+------------------+-----+
|2016-06-01|regF|prodB| 95087.40786730956| 5119.116197026179|false|
|2016-05-01|regA|prodD|13902.853320925453| 3640.983140575281|false|
|2016-05-01|regA|prodF| 80490.42963823416| 1394.256480932108|false|
|2015-08-01|regF|prodE| 88028.28169838544| 2795.228503436695|false|
|2015-12-01|regC|prodC| 56930.60932417757| 1377.153477373438| true|
|2017-01-01|regB|prodH|51223.826988791385| 9400.338819553575|false|
|2016-04-01|regB|prodI| 96762.35648098258| 5638.452303290489|false|
|2017-02-01|regB|prodL|15474.286674505525|  9902.10432066511|false|
|2017-01-01|regF|prodE|  53108.3719898527| 5887.276278887673|false|
|2015-04-01|regA|prodJ| 46145.55290613044|3279.8238868235567|false|
|2015-01-01|regF|prodM| 71748.77464560093| 7779.618269369784|false|
|2017-03-01|regF|prodK|54124.196872980276| 2719.

In [71]:
#5
df.withColumn('vol_val',f.when(df['flag']==True,df['val'] * df['vol']).otherwise(0)).show()

+----------+----+-----+------------------+------------------+-----+-------------------+
|        dt| geo| prod|               val|               vol| flag|            vol_val|
+----------+----+-----+------------------+------------------+-----+-------------------+
|2016-06-01|regF|prodB| 95087.40786730956| 5119.116197026179|false|                0.0|
|2016-05-01|regA|prodD|13902.853320925453| 3640.983140575281|false|                0.0|
|2016-05-01|regA|prodF| 80490.42963823416| 1394.256480932108|false|                0.0|
|2015-08-01|regF|prodE| 88028.28169838544| 2795.228503436695|false|                0.0|
|2015-12-01|regC|prodC| 56930.60932417757| 1377.153477373438| true| 7.84021865997798E7|
|2017-01-01|regB|prodH|51223.826988791385| 9400.338819553575|false|                0.0|
|2016-04-01|regB|prodI| 96762.35648098258| 5638.452303290489|false|                0.0|
|2017-02-01|regB|prodL|15474.286674505525|  9902.10432066511|false|                0.0|
|2017-01-01|regF|prodE|  53108.3

In [59]:
#6
df.withColumn('year',f.year('dt')).show()

+----------+----+-----+------------------+------------------+-----+----+
|        dt| geo| prod|               val|               vol| flag|year|
+----------+----+-----+------------------+------------------+-----+----+
|2016-06-01|regF|prodB| 95087.40786730956| 5119.116197026179|False|2016|
|2016-05-01|regA|prodD|13902.853320925453| 3640.983140575281|False|2016|
|2016-05-01|regA|prodF| 80490.42963823416| 1394.256480932108|False|2016|
|2015-08-01|regF|prodE| 88028.28169838544| 2795.228503436695|False|2015|
|2015-12-01|regC|prodC| 56930.60932417757| 1377.153477373438| True|2015|
|2017-01-01|regB|prodH|51223.826988791385| 9400.338819553575|False|2017|
|2016-04-01|regB|prodI| 96762.35648098258| 5638.452303290489|False|2016|
|2017-02-01|regB|prodL|15474.286674505525|  9902.10432066511|False|2017|
|2017-01-01|regF|prodE|  53108.3719898527| 5887.276278887673|False|2017|
|2015-04-01|regA|prodJ| 46145.55290613044|3279.8238868235567|False|2015|
|2015-01-01|regF|prodM| 71748.77464560093| 7779.618

### Zaawansowane operacje na oknach

**Ranking**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("ranked", f.rank().over(windowSpec)).show()

In [None]:
q = """select dt, geo, prod, val, vol, flag, rank() over (partition by prod order by dt) as ranked from df"""
spark.sql(q).show()

**Różnica od pierwszej wartości**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("diff_from_first", df.val - f.first(df.val).over(windowSpec)).show()

In [None]:
q = """select dt, geo, prod, val, vol, flag, val - first(val) over (partition by prod order by dt) as diff_from_first from df"""
spark.sql(q).show()

**Średnia ruchoma**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt").rowsBetween(-1,1)

df.withColumn("moving_avg", f.avg(df.val).over(windowSpec)).show()

In [None]:
q = """select dt, geo, prod, val, vol, flag, \
avg(val) over (partition by prod order by dt rows between 1 preceding and 1 following) as moving_avg from df"""
spark.sql(q).show()

**Średnia od pierwszego do bierzącego rekordu**

In [None]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("avg_from_start", f.avg(df.val).over(windowSpec)).show()

In [None]:
q = """select dt, geo, prod, val, vol, flag, avg(val) over (partition by prod order by dt) as avg_from_start from df"""
spark.sql(q).show()

In [None]:
# inne podejście
windowSpec = Window.partitionBy("prod").orderBy("dt").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("avg_from_start", f.avg(df.val).over(windowSpec)).show()

In [None]:
q = """select dt, geo, prod, val, vol, flag, \
avg(val) over (partition by prod order by dt rows between unbounded preceding and current row) as avg_from_start from df"""
spark.sql(q).show()

> **ZADANIE 4:**
8. Pogrupuj dane po dacie i produkcie po czym porównaj wolumen produktów ze średnim wolumenem z trzech wcześniejszych okresów
9. Stwórz kolumnę z rankingiem opartm na dacie dla kombinacji produkt-region
10. Oblicz różnicę w wolumenie pomiędzy następującymi po sobie datami dla regionów
11. Oblicz różnicę rok do roku w wartości i wolumenie dla produktów
12. Oblicz udział w całkowitej wartości poszczególnych produktów w danym roku dla każdego regionu

**Dodatek:**

#### User Defined Functions (UDFs)

(Używaj tylko w ostateczności)

In [None]:
from pyspark.sql.types import IntegerType, StringType, FloatType

In [None]:
def udfPower3(value):
    return(value**3)

In [None]:
udfPower3(3.14)

In [None]:
power3 = f.udf(udfPower3, FloatType())

In [None]:
df.select(power3(df.val)).show()

In [None]:
spark.udf.register("power3", udfPower3, FloatType())

In [None]:
spark.sql("select power3(val) from df").show()

In [None]:
def udfDuplicate_string(x):
    return x + " " + x

In [None]:
udfDuplicate_string("input")

In [None]:
duplicate_string = f.udf(udfDuplicate_string, StringType())

In [None]:
df.select(duplicate_string(df.prod)).show()

In [None]:
spark.udf.register("duplicate_string", udfDuplicate_string, StringType())

In [None]:
spark.sql("select duplicate_string(prod) from df").show()

In [None]:
def udfTwo_col(x,y):
    return x / y

In [None]:
udfTwo_col(10,2)

In [None]:
two_col = f.udf(udfTwo_col, FloatType())

In [None]:
df.select(two_col(df.vol,df.val)).show()

In [None]:
spark.udf.register("two_col", udfTwo_col, FloatType())

In [None]:
spark.sql("select two_col(vol,val) from df").show()