### Start SparkSession

In [13]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession \
            .builder \
            .appName("halltape_pyspark_local") \
    .getOrCreate()


print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)

Активные Spark сессии: http://macbookpro:4040


### Read

In [3]:
PATH = 'data/customs_data.csv'

In [5]:
spark.read.csv(PATH).show()

+--------------------+
|                 _c0|
+--------------------+
|month;country;cod...|
|01/2016;IT;620469...|
|01/2016;CN;900190...|
|01/2016;BY;841430...|
|01/2016;US;901850...|
|01/2016;EE;902110...|
|01/2016;FR;381600...|
|01/2016;MX;852351...|
|01/2016;JP;620452...|
|01/2016;KR;611020...|
|01/2016;KG;852713...|
|01/2016;ZA;842123...|
|01/2016;CN;851810...|
|01/2016;TR;841790...|
|01/2016;IT;390610...|
|01/2016;CZ;870840...|
|01/2016;ES;640419...|
|01/2016;IT;940490...|
|01/2016;UA;820780...|
|01/2016;CN;330410...|
+--------------------+
only showing top 20 rows



In [6]:
spark.read.csv(PATH, sep=';', header=True).show(5)

+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|  month|country|      code| value|netto|quantity|region|district|direction_eng|measure_eng|           load_date|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|01/2016|     IT|6204695000|   131|    1|       7| 46000|      01|           IM|        ShT|2024-07-01T00:00:...|
|01/2016|     CN|9001900009|112750|   18|       0| 46000|      01|           IM|          1|2024-01-01T00:00:...|
|01/2016|     BY|8414302004|   392|   57|       8| 50000|      06|           IM|        ShT|2024-06-01T00:00:...|
|01/2016|     US|9018509000| 54349|  179|       0| 40000|      02|           IM|          1|2024-04-01T00:00:...|
|01/2016|     EE|9021101000| 17304|  372|       0| 46000|      01|           IM|          1|2024-02-01T00:00:...|
+-------+-------+----------+------+-----+--------+------+--------+-------------+--------

In [12]:
df = spark.read.csv(PATH, sep=';', header=True)

df.show(truncate=False)

df.show(2, False, True)

+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|month  |country|code      |value |netto |quantity|region|district|direction_eng|measure_eng|load_date                    |
+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|01/2016|IT     |6204695000|131   |1     |7       |46000 |01      |IM           |ShT        |2024-07-01T00:00:00.000+03:00|
|01/2016|CN     |9001900009|112750|18    |0       |46000 |01      |IM           |1          |2024-01-01T00:00:00.000+03:00|
|01/2016|BY     |8414302004|392   |57    |8       |50000 |06      |IM           |ShT        |2024-06-01T00:00:00.000+03:00|
|01/2016|US     |9018509000|54349 |179   |0       |40000 |02      |IM           |1          |2024-04-01T00:00:00.000+03:00|
|01/2016|EE     |9021101000|17304 |372   |0       |46000 |01      |IM           |1          |2024-02-01T00:00:00.000+03:00|
|01/2016

In [None]:
### PrintSchema

In [13]:
df.printSchema()

root
 |-- month: string (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)
 |-- netto: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- direction_eng: string (nullable = true)
 |-- measure_eng: string (nullable = true)
 |-- load_date: string (nullable = true)



In [15]:
result = df\
            .withColumnRenamed("direction_eng", "direction")\
            .withColumnRenamed("measure_eng", "measure")

result.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

### Select

In [16]:
result.select('country').distinct().show(10, truncate=False)



+-------+
|country|
+-------+
|LT     |
|MM     |
|DZ     |
|CI     |
|TC     |
|FI     |
|SC     |
|AZ     |
|UA     |
|RO     |
+-------+
only showing top 10 rows



                                                                                

### GroupBy

In [17]:
result\
    .groupBy('country')\
    .agg(F.count('*').alias('total_rows'))\
    .orderBy(F.col('total_rows').desc())\
    .show()



+-------+----------+
|country|total_rows|
+-------+----------+
|     BY|   3509568|
|     KZ|   2519896|
|     CN|   2454792|
|     DE|   1542311|
|     UA|   1158498|
|     IT|   1102837|
|     US|    835936|
|     PL|    666690|
|     FR|    593040|
|     JP|    571756|
|     TR|    463432|
|     KR|    446907|
|     GB|    443091|
|     AM|    438705|
|     CZ|    407360|
|     KG|    403565|
|     ES|    401644|
|     IN|    374151|
|     NL|    365193|
|     UZ|    329707|
+-------+----------+
only showing top 20 rows



                                                                                

### Filter

In [24]:
# Два варианта написании фильтрации
df_de = result\
            .where(F.col('country') == 'DE')\
            .where(F.col('value').isNotNull())

df_de2 = result\
            .where(''' country == "DE" ''')\
            .where(''' value IS NOT NULL ''')

print(df_de.count() == df_de2.count())



True


                                                                                

### Save to CSV

In [26]:
df_de.show(2, truncate=False)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
|month  |country|code      |value|netto|quantity|region|district|direction|measure|load_date                    |
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
|01/2016|DE     |4016995709|5901 |172  |0       |46000 |01      |IM       |1      |2024-01-01T00:00:00.000+03:00|
|01/2016|DE     |8708809109|1213 |94   |0       |45000 |01      |IM       |1      |2024-01-01T00:00:00.000+03:00|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
only showing top 2 rows



In [29]:
df_de.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

In [30]:
final = df_de\
            .select('month',
                    'country',
                    'code',
                    'value',
                    'netto',
                    'quantity',
                    'region',
                    'district',
                    'direction',
                    'measure',
                    F.col('load_date').cast('date'))

final.show(2, truncate=False)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|month  |country|code      |value|netto|quantity|region|district|direction|measure|load_date |
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|01/2016|DE     |4016995709|5901 |172  |0       |46000 |01      |IM       |1      |2024-01-01|
|01/2016|DE     |8708809109|1213 |94   |0       |45000 |01      |IM       |1      |2024-01-01|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
only showing top 2 rows



In [31]:
# Сохранение неконтроллируемое по кол-ву файлов
final\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_no_control')

partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
final\
    .coalesce(1)\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_one_file') 

partition_num = final.coalesce(1).rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')


# Сохранения с партицированием
final\
    .write\
    .partitionBy('load_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned')

print_df = final.select('load_date').distinct()
print(f'Load_date distinct: {print_df.count()}')


# Сохранения с партицированием и repartition внутри самой партиции
final\
    .repartition(1, 'load_date')\
    .write\
    .partitionBy('load_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned_repart')

partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

                                                                                

Кол-во партиций 16


                                                                                

Кол-во партиций 1


                                                                                

Load_date distinct: 10




Кол-во партиций 1




### Read Transformed

In [33]:
spark.stop()

In [68]:
reader_no_control = spark\
                        .read\
                        .csv('data/final_no_control/', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')

reader_final_one_file = spark\
                            .read\
                            .csv('data/final_one_file/', header=True, sep=';')\
                            .where(''' load_date = "2024-01-01" ''')

reader_partitioned = spark\
                        .read\
                        .csv('data/final_partitioned', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')

reader_partitioned_repart = spark\
                                .read\
                                .csv('data/final_partitioned_repart', header=True, sep=';')\
                                .where(''' load_date = "2024-01-01" ''')


reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 2.5 s (90 ms, 301 ms, 384 ms)

reader_final_one_file.count() # number of files read: 1 | size of files read: 88.4 MiB | 3.2 s (306 ms, 407 ms, 420 ms )

reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 305 ms (32 ms, 39 ms, 54 ms )

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 179 ms (9 ms, 43 ms, 44 ms )

350998

In [12]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [10]:
spark.stop()

### JOIN

In [14]:
data = [(45000, 'pyspark'), (40000,'airflow'), (6000,'greenplum')]

# region_df = spark.createDataFrame(data, schema='region_id long, name string')


df = spark.createDataFrame(
    [
        (1, "foo"),  # create your data here, be consistent in the types.
        (2, "bar"),
    ],
    ["id", "label"]  # add your column names here
)

df.show()
# mix.join(mix, "id", "inner").show()

                                                                                

+---+-----+
| id|label|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+



In [None]:
mix.join(F.broadcast(mix), "id", "inner").show()

In [None]:
filtered = mix.where(F.col("number") != "six")

mix.join(filtered, "id", "anti").show()

### Cache | Persist

In [205]:
df = spark.read.csv('data/final_partitioned_repart', header=True, sep=';')

df.cache().count()

                                                                                

27936

In [208]:
df.unpersist()

DataFrame[id: string, brand: string, actual_price: string, date: string, load_date: date]

In [207]:
from pyspark.storagelevel import StorageLevel

df = spark.read.csv('data/final_partitioned_repart', header=True, sep=';')

df.persist(StorageLevel.DISK_ONLY).count()

                                                                                

27936

### Functions

In [None]:
df = spark.read.csv('not_york_one_file', header=True)

df.select("brand")\
        .distinct()\
        .orderBy(F.col("brand").desc())\
        .show(truncate=False)

In [None]:
df.select(F.lower("brand").alias("test_col"))\
        .distinct()\
        .orderBy(F.col("brand").asc())\
        .show(1, truncate=False)

In [None]:
one_row = df\
            .select(F.lower("brand").alias("test_col"))\
            .distinct()\
            .orderBy(F.col("test_col").asc())\
            .limit(1)

In [None]:
from pyspark.sql.types import *

one_row = df\
            .select(F.lower("brand").alias("test_col"))\
            .distinct()\
            .orderBy(F.col("test_col").asc())\
            .limit(1)


schema = ArrayType(
    StructType([
        StructField("key", StringType()),
        StructField("value", StringType())
    ])
)



new_df = one_row\
            .withColumn("test", F.from_json("test_col", schema))\
            .withColumn("element", F.explode(F.col("test")))



new_df.show(truncate=False)

### Repartition & Coalesce

In [209]:
data = [(1,'one'), (2,'two'), (3,'three'), (4,'four'),
        (5,'five'), (6,'six'), (7, 'seven'), (8, 'eight'),
        (9, 'nine')]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

+---+------+
| id|number|
+---+------+
|  1|   one|
|  2|   two|
|  3| three|
|  4|  four|
|  5|  five|
|  6|   six|
|  7| seven|
|  8| eight|
|  9|  nine|
+---+------+



In [210]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(8)
mix.rdd.glom().collect()

[[Row(id=3, number='three'), Row(id=6, number='six')],
 [],
 [Row(id=1, number='one'), Row(id=2, number='two')],
 [],
 [Row(id=8, number='eight')],
 [Row(id=5, number='five'), Row(id=9, number='nine')],
 [Row(id=4, number='four')],
 [Row(id=7, number='seven')]]

In [211]:
mix.repartition(3).rdd.glom().collect()

[[Row(id=1, number='one'),
  Row(id=4, number='four'),
  Row(id=7, number='seven'),
  Row(id=9, number='nine')],
 [Row(id=2, number='two')],
 [Row(id=3, number='three'),
  Row(id=5, number='five'),
  Row(id=6, number='six'),
  Row(id=8, number='eight')]]

In [212]:
mix.coalesce(3).rdd.glom().collect()

[[Row(id=3, number='three'), Row(id=6, number='six')],
 [Row(id=7, number='seven'),
  Row(id=1, number='one'),
  Row(id=2, number='two'),
  Row(id=5, number='five'),
  Row(id=9, number='nine')],
 [Row(id=8, number='eight'), Row(id=4, number='four')]]

In [213]:
mix.toPandas().head()

Unnamed: 0,id,number
0,3,three
1,6,six
2,1,one
3,2,two
4,8,eight
