In [1]:
#   @@@@@@@     @      @    @  @   #
#   @  @  @    @ @      @  @       #
#      @  @   @   @      @@    @   #
#      @     @@@@@@@    @  @   @   #
#      @    @       @  @    @  @   #

Привет, в этой практике мы с вами применим наши знания по PySpark и постараемся изучить что-то новое в процессе выполнения.
<br>В занятии используется датасет собранный на основе данных <a href="https://www.kaggle.com/chicago/chicago-taxi-rides-2016">Chicago Taxi Rides 2016</a>
<br>Полная <a href="https://spark.apache.org/docs/latest/api/python/index.html">документация PySpark</a>.
<br>Схема данны:
<br>|-- taxi_id = идентификатор таксиста
<br>|-- trip_start_timestamp = время начала поездки
<br>|-- trip_end_timestamp = время окончания поездки
<br>|-- trip_seconds = время длительности поездки в секундах
<br>|-- trip_miles = мили проиденные во время поездки
<br>|-- fare = транспортные расходы
<br>|-- tips = назначенные чаевые
<br>|-- trip_total = общая стоимость поездки
<br>|-- payment_type = тип оплаты

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.appName('PySparkTasks').getOrCreate()

In [3]:
spark.conf.set("spark.sql.session.timeZone", "GMT+3")

In [4]:
spark

Скачайте <a href="https://github.com/AlexKbit/stepik-ds-course/raw/master/Week3/spark-tasks/taxi_data.parquet">taxi_data.parquet</a> и загрузите используя <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame">SparkAPI</a>

In [4]:
df = spark.read.option('header', 'true').parquet('taxi_data.parquet')

№1 Посчитайте количество загруженных строк.

In [6]:
df.count()

2540712

In [8]:
df.show()

+-------+--------------------+-------------------+------------+----------+-----+-----+----------+------------+
|taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles| fare| tips|trip_total|payment_type|
+-------+--------------------+-------------------+------------+----------+-----+-----+----------+------------+
|   5240| 2016-12-15 23:45:00|2016-12-16 00:00:00|         900|       2.5|10.75| 2.45|      14.7| Credit Card|
|   1215| 2016-12-12 07:15:00|2016-12-12 07:15:00|         240|       0.4|  5.0|  3.0|       9.5| Credit Card|
|   3673| 2016-12-16 16:30:00|2016-12-16 17:00:00|        2400|      10.7| 31.0|  0.0|      31.0|        Cash|
|   5400| 2016-12-16 08:45:00|2016-12-16 09:00:00|         300|       0.0| 5.25|  2.0|      7.25| Credit Card|
|   1257| 2016-12-03 18:45:00|2016-12-03 18:45:00|         360|       0.3|  5.0|  0.0|       5.0|        Cash|
|   4666| 2016-12-30 18:00:00|2016-12-30 18:45:00|        2400|      18.2|46.75|10.15|      61.4| Credit Card|
|

In [9]:
df.printSchema()

root
 |-- taxi_id: integer (nullable = true)
 |-- trip_start_timestamp: timestamp (nullable = true)
 |-- trip_end_timestamp: timestamp (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- trip_total: double (nullable = true)
 |-- payment_type: string (nullable = true)



Посмотрим схему данных:

№2 Чему равна корреляция и ковариация между длиной маршрута и ценой за поездку? Ответ округлите до 5 знаков после запятой.
<br>Подробнее <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.corr">corr</a> & <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cov">cov</a>

In [7]:
round(df.corr('trip_miles', 'trip_total'),5)

0.44816

In [8]:
round(df.cov('trip_miles', 'trip_total'),5)

71.96914

№3 Найдите количество, среднее, cреднеквадратическое отклонение, минимум и максимум для длины маршрута и цены за поездку? Ответ округлите до 1 знака после запятой. Подробнее <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe">describe</a>

In [13]:
df.describe(['trip_miles', 'trip_total']).select('summary', f.bround('trip_total', 1).alias('trip_total_desc'),
                                                 f.bround('trip_miles', 1).alias('trip_miles_desc')).show()

+-------+---------------+---------------+
|summary|trip_total_desc|trip_miles_desc|
+-------+---------------+---------------+
|  count|      2540672.0|      2540677.0|
|   mean|           15.9|            3.0|
| stddev|           30.5|            5.3|
|    min|            0.0|            0.0|
|    max|         9276.7|          900.0|
+-------+---------------+---------------+



№4 Найдите самый НЕ популярный вид оплаты.
<br>Подробнее <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy">groupBy</a> <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy">orderBy</a>

In [14]:
df.groupby('payment_type').agg({'taxi_id':'count'}).orderBy('count(taxi_id)').collect()[0][0]

'Way2ride'

№5 Найдите идентификатор таксиста выполнившего наибольшее число заказов.

In [15]:
df.groupby('taxi_id').count().orderBy('count', ascending=False).collect()[0]

Row(taxi_id=316, count=2225)

№6 Чему равна средняя цена среди поездок, оплаченных наличными? Ответ округлите до 5 знака.
<br> Подробней <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where">where</a>

In [16]:
df.groupBy('payment_type').agg({'trip_total':'avg'})\
.where(f.col('payment_type')=='Cash')\
.select('payment_type', f.bround('avg(trip_total)', 5).alias('avg_cash')).show()

+------------+--------+
|payment_type|avg_cash|
+------------+--------+
|        Cash|12.03526|
+------------+--------+



№7 Сколько таксистов проехало больше 1000 миль за все время выполнения заказов?

In [17]:
df.groupBy('taxi_id').agg({'trip_miles':'sum'}).where(f.col('sum(trip_miles)')>1000).count()

2860

№8 Сколько миль проехал пассажир в самой долгой поездке? (Ответ округлите до целого)

In [38]:
df.orderBy(f.col("trip_seconds").desc()).select('trip_miles').collect()[0]

Row(trip_miles=0.0)

№9 Каков средний заработок всех таксистов? Ответ округлите до 5-ого знака.
<br>Отсеките неизвестные машины (не определенный taxi_id).

In [62]:
df.groupBy('taxi_id').agg(f.sum('trip_total').alias('sum_total')).filter('taxi_id is not null')\
.select(f.bround(f.avg(f.col('sum_total')),5)).show()

+-------------------------+
|bround(avg(sum_total), 5)|
+-------------------------+
|               8218.85627|
+-------------------------+



№10 Сколько поездок начиналось в самый загруженный час?
<br>Используйте функцию <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.hour">hour</a>

In [9]:
from pyspark.sql.functions import hour

In [18]:
df.groupBy(hour('trip_start_timestamp')).count().orderBy(f.col('count').desc()).collect()[0]

Row(hour(trip_start_timestamp)=18, count=181127)

№11 Сколько поездок началось во второй четверти дня?

In [31]:
df.groupBy(hour('trip_start_timestamp')).count()\
.filter(f.col('hour(trip_start_timestamp)')>=6).filter(f.col('hour(trip_start_timestamp)')<12)\
.select(f.sum('count')).show()

+----------+
|sum(count)|
+----------+
|    538737|
+----------+



№12 Найдите топ три даты, в которые было суммарно больше всего чаевых? (Чаевые выдаются после совершения поездки)
<br> Ожидаемый формат дат YYYY-MM-DD
<br>Вам может понадобится конвертация типов <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast">cast</a>

In [None]:
from pyspark.sql.types import DateType

In [10]:
df.groupBy(f.to_date('trip_end_timestamp')).agg(f.sum('tips')).orderBy(f.col('sum(tips)').desc()).limit(3).show()

+-----------------------------+------------------+
|to_date(`trip_end_timestamp`)|         sum(tips)|
+-----------------------------+------------------+
|                   2016-11-03|110102.37000000013|
|                   2016-11-09|106187.87999999986|
|                   2016-11-16| 99993.77000000038|
+-----------------------------+------------------+



№13 Сколько было заказов в дату с наибольшим спросом?

In [12]:
df.groupBy(f.to_date('trip_start_timestamp')).count().orderBy(f.col('count').desc()).show(1)

+-------------------------------+-----+
|to_date(`trip_start_timestamp`)|count|
+-------------------------------+-----+
|                     2016-11-03|61259|
+-------------------------------+-----+
only showing top 1 row



Подгрузите данные о марках машин из датасета <a href="https://github.com/AlexKbit/stepik-ds-course/raw/master/Week3/spark-tasks/taxi_cars_data.parquet">taxi_cars_data.parquet</a>

In [13]:
df_car = spark.read.option('header', 'true').parquet('taxi_cars_data.parquet')

In [24]:
df_car.show(5)

+-------+-------------------+
|taxi_id|          car_model|
+-------+-------------------+
|   1159|       Toyota Prius|
|   7273|Ford Crown Victoria|
|   2904|        Honda Civic|
|   3210|        Ford Fusion|
|   2088|       Toyota Camry|
+-------+-------------------+
only showing top 5 rows



№14 Какая марка машины самая распрастранненая среди таксистов?
<br>Подробнее <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.split">split</a>

In [23]:
from pyspark.sql.functions import split

In [23]:
df_car.select(f.split(f.col('car_model'), ' ', 2).getItem(0).alias('model')).show(5)

+------+
| model|
+------+
|Toyota|
|  Ford|
| Honda|
|  Ford|
|Toyota|
+------+
only showing top 5 rows



In [28]:
df_car.groupBy(f.split(f.col('car_model'), ' ', 2).getItem(0).alias('model'))\
.count().orderBy(f.col('count').desc()).limit(1).show()

+-----+-----+
|model|count|
+-----+-----+
| Ford| 1484|
+-----+-----+



№15 Сколько раз и какая модель машин чаще всего встречается в поездках?
<br>Подробнее <a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join">join</a>

In [34]:
ndf = df.join(df_car, df.taxi_id==df_car.taxi_id, 'left_outer').drop(df_car.taxi_id)

In [36]:
ndf.groupBy('car_model')\
.count().orderBy(f.col('count').desc()).limit(1).show()

+-------------------+------+
|          car_model| count|
+-------------------+------+
|Ford Crown Victoria|388682|
+-------------------+------+



Почувствуй силу сжатия! сохрани DataFrame в csv и сравни размеры файлов.

In [37]:
ndf.coalesce(1).write.option('header', 'true').parquet('taxi.parquet')

Теперь загрузите данные из csv и проверьте типы методом printSchema().

In [38]:
df = spark.read.option('header', 'true').parquet('taxi.parquet')
df. printSchema()

root
 |-- taxi_id: integer (nullable = true)
 |-- trip_start_timestamp: timestamp (nullable = true)
 |-- trip_end_timestamp: timestamp (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- trip_total: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- car_model: string (nullable = true)



Не забудьте посетить SparkUI и изучить историю ваших задач.

In [None]:
spark