In [1]:
import pyspark
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType 
from pyspark.sql.functions import col, substring
from pyspark.sql.functions import col, asc,desc
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import to_date

In [2]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

23/01/12 12:56:07 WARN Utils: Your hostname, alena-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/01/12 12:56:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/12 12:56:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
my_schema = StructType() \
      .add("departure",TimestampType (),True) \
      .add("return",TimestampType (),True) \
      .add("departure_id",StringType(),True) \
      .add("departure_name",StringType(),True) \
      .add("return_id",StringType(),True) \
      .add("return_name",StringType(),True) \
      .add("distance (m)",FloatType(),True) \
      .add("duration (sec.)",FloatType(),True) \
      .add("avg_speed (km/h)",FloatType(),True) \
      .add("departure_latitude",FloatType(),True) \
      .add("departure_longitude",FloatType(),True) \
      .add("return_latitude",FloatType(),True) \
      .add("return_longitude",FloatType(),True) \
      .add("Air temperature (degC)",FloatType(),True) 

In [4]:
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(my_schema) \
      .load("database.csv")
df_with_schema.printSchema()

root
 |-- departure: timestamp (nullable = true)
 |-- return: timestamp (nullable = true)
 |-- departure_id: string (nullable = true)
 |-- departure_name: string (nullable = true)
 |-- return_id: string (nullable = true)
 |-- return_name: string (nullable = true)
 |-- distance (m): float (nullable = true)
 |-- duration (sec.): float (nullable = true)
 |-- avg_speed (km/h): float (nullable = true)
 |-- departure_latitude: float (nullable = true)
 |-- departure_longitude: float (nullable = true)
 |-- return_latitude: float (nullable = true)
 |-- return_longitude: float (nullable = true)
 |-- Air temperature (degC): float (nullable = true)



In [9]:
new_df = df_with_schema.withColumn("months", substring("departure", 1,7))
new_df.show(truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

+-------------------+-------------------+------------+------------------+---------+---------------------+------------+---------------+----------------+------------------+-------------------+---------------+----------------+----------------------+-------+
|departure          |return             |departure_id|departure_name    |return_id|return_name          |distance (m)|duration (sec.)|avg_speed (km/h)|departure_latitude|departure_longitude|return_latitude|return_longitude|Air temperature (degC)|months |
+-------------------+-------------------+------------+------------------+---------+---------------------+------------+---------------+----------------+------------------+-------------------+---------------+----------------+----------------------+-------+
|2020-03-23 06:09:44|2020-03-23 06:16:26|86          |Kuusitie          |111.0    |Esterinportti        |1747.0      |401.0          |0.2613965       |60.195244         |24.9019            |60.19757       |24.92678        |0.9         

                                                                                

In [6]:
month = new_df.select("months").distinct().collect()
print(month)



[Row(months='2020-06'), Row(months='2020-05'), Row(months='2020-03'), Row(months='2020-04'), Row(months='2020-07'), Row(months='2020-08'), Row(months='2019-04'), Row(months='2020-09'), Row(months='2020-10'), Row(months='2019-05'), Row(months='2019-07'), Row(months='2019-06'), Row(months='2019-08'), Row(months='2019-09'), Row(months='2019-10'), Row(months='2018-05'), Row(months='2018-04'), Row(months='2018-06'), Row(months='2018-08'), Row(months='2018-07'), Row(months='2018-09'), Row(months='2018-10'), Row(months='2017-05'), Row(months='2016-09'), Row(months='2016-10'), Row(months='2016-05'), Row(months='2016-07'), Row(months='2016-06'), Row(months='2016-08'), Row(months='2017-08'), Row(months='2017-06'), Row(months='2017-07'), Row(months='2017-09'), Row(months='2017-10')]


                                                                                

In [7]:
#creating new csv files
new_df.repartition("months").write.partitionBy("months").format("com.databricks.spark.csv").option("header",True).mode('append').save("data/months")

                                                                                

In [8]:
#IMPORTANT NOTE! Please don't forget to delete success files manually
#creating coherent names for csv files months=year_month

path = '/home/alena/Data/Localstack/data/months'
subdirs = os.listdir('/home/alena/Data/Localstack/data/months')
for subdir in subdirs:
    files = os.listdir(os.path.join(path, subdir))
    for file in files:
        file_extension = [x for x in os.path.splitext(file)]
        filepath = os.path.join(path,subdir)
        if file_extension[1] == '.csv':
            os.rename(os.path.join(filepath, file), os.path.join(filepath, f"{subdir}"))

In [10]:
#Метрика которая будет группировать сколько раз брали на одной станции (departure_name)
table1 = new_df.groupBy("departure_name").count().withColumnRenamed("count","count_taken")

In [11]:
#Метрика которая будет группировать сколько раз вернули на одной станции (return_name)
table2 = new_df.groupBy("return_name").count().withColumnRenamed("count","count_returned")

In [12]:
#result_table = table1.join(table2,table1["departure_name"] == table2["return_name"])
result_table = table1.join(table2,table1["departure_name"] == table2["return_name"]).select('departure_name', 'count_taken', 'count_returned')
result_table.show()



+--------------------+-----------+--------------+
|      departure_name|count_taken|count_returned|
+--------------------+-----------+--------------+
|         Olarinluoma|       7047|          8163|
|          Marjaniemi|       2882|          2985|
|      Diakoniapuisto|     112022|        100651|
|      Eteläesplanadi|      77197|         83746|
|          Länsituuli|      34177|         33464|
|   Pitäjänmäen asema|       8522|          8100|
|   Vanha Kauppahalli|      31638|         33020|
|            Kuunkatu|      14031|         13405|
|         Puotinharju|       6160|          6123|
|     Herttoniemi (M)|      30771|         30040|
|  Viikin tiedepuisto|      21728|         21896|
|Laivalahden puist...|      11519|         12043|
|             Tietäjä|       9411|          8795|
|     Kirkkoherrantie|       4530|          4372|
|       Revontulentie|      13357|         13163|
|      Siltavoudintie|       4438|          4333|
|            Narinkka|     122355|        121614|


                                                                                