### Start SparkSession

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession \
            .builder \
            .appName("pyspark_local") \
            .master("spark://spark-master:7077") \
            .config("spark.driver.memory", "4g") \
            .config("spark.executor.memory", "2g") \
            .config("spark.executor.cores", "2") \
    .getOrCreate()


# spark = SparkSession \
#             .builder \
#             .appName("pyspark_local") \
#             .master("spark://spark-master:7077") \
#             .config("spark.driver.memory", "8g") \
#             .config("spark.executor.memory", "2g") \
#             .config("spark.executor.cores", "2") \
#     .getOrCreate()

sc = spark.sparkContext


print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)
sc

25/07/17 18:30:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Активные Spark сессии: http://16ecfb548f4b:4040


In [2]:
driver_memory = spark.sparkContext.getConf().get("spark.driver.memory")
print(f"Сконфигурированная память драйвера: {driver_memory}")

executor_memory = spark.sparkContext.getConf().get("spark.executor.memory")
print(f"Сконфигурированная память исполнителя: {executor_memory}")

pyspark_executor_memory = spark.sparkContext.getConf().get("spark.executor.pyspark.memory")
print(f"Сконфигурированная память PySpark для исполнителя: {pyspark_executor_memory}")

Сконфигурированная память драйвера: 8g
Сконфигурированная память исполнителя: 2g
Сконфигурированная память PySpark для исполнителя: None


### Read

In [3]:
PATH = 'data/orders.csv'

In [4]:
spark.read.csv(PATH).show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+------+------------+
|              _c0|                 _c1|                 _c2|                 _c3|                 _c4|                 _c5|                 _c6|       _c7|                 _c8|                 _c9|  _c10|        _c11|
+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+------+------------+
|             name|               email|             address|   registration_date|               phone|             company|                 job| birthdate|             country|                uuid|salary|country_code|
|Jermaine Espinoza|  fbaker@example.org|634 Gonzalez Inle...|2024-06-01T00:00:...| +1-605-744-8478x494|           Baker Ltd|

                                                                                

In [5]:
spark.read.csv(PATH, sep=',', header=True).show(5)

                                                                                

+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-----------------+--------------------+------+------------+
|             name|               email|             address|   registration_date|               phone|             company|                 job| birthdate|          country|                uuid|salary|country_code|
+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-----------------+--------------------+------+------------+
|Jermaine Espinoza|  fbaker@example.org|634 Gonzalez Inle...|2024-06-01T00:00:...| +1-605-744-8478x494|           Baker Ltd| Broadcast presenter|1985-01-12|       Azerbaijan|9c482b2e-b191-446...| 10350|          HT|
|    Lauren Thomas|alfredwaters@exam...|26631 Jill Juncti...|2024-03-12T00:00:...|        722.209.9393|   Castillo-Valencia|      Dramat

In [6]:
df = spark.read.csv(PATH, sep=',', header=True)

In [7]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- country_code: string (nullable = true)



In [8]:
result = df\
    .withColumnRenamed("name", "full_name")\
    .withColumnRenamed("phone", "phone_number")

result.printSchema()
result.columns

root
 |-- full_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- country_code: string (nullable = true)



['full_name',
 'email',
 'address',
 'registration_date',
 'phone_number',
 'company',
 'job',
 'birthdate',
 'country',
 'uuid',
 'salary',
 'country_code']

In [9]:
result.show(5, False)

+-----------------+------------------------+---------------------------------------------------------+-----------------------------+----------------------+--------------------------+-----------------------------+----------+-----------------+------------------------------------+------+------------+
|full_name        |email                   |address                                                  |registration_date            |phone_number          |company                   |job                          |birthdate |country          |uuid                                |salary|country_code|
+-----------------+------------------------+---------------------------------------------------------+-----------------------------+----------------------+--------------------------+-----------------------------+----------+-----------------+------------------------------------+------+------------+
|Jermaine Espinoza|fbaker@example.org      |634 Gonzalez Inlet Suite 238, South Trevormouth, VA 19774|2

In [10]:
result.show(2, False, True)

-RECORD 0----------------------------------------------------------------------
 full_name         | Jermaine Espinoza                                         
 email             | fbaker@example.org                                        
 address           | 634 Gonzalez Inlet Suite 238, South Trevormouth, VA 19774 
 registration_date | 2024-06-01T00:00:00.000+03:00                             
 phone_number      | +1-605-744-8478x494                                       
 company           | Baker Ltd                                                 
 job               | Broadcast presenter                                       
 birthdate         | 1985-01-12                                                
 country           | Azerbaijan                                                
 uuid              | 9c482b2e-b191-4462-bee4-b5dda215e792                      
 salary            | 10350                                                     
 country_code      | HT                 

### PrintSchema

In [11]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- country_code: string (nullable = true)



In [12]:
result = df \
    .withColumnRenamed("name", "first_name") \
    .withColumnRenamed("phone", "phone_number") \
    .withColumn('registration_date', F.col('registration_date').cast('date')) \
    .withColumn('birthdate', F.col('birthdate').cast('date'))

result.printSchema()

root
 |-- first_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- country_code: string (nullable = true)



In [13]:
df \
    .select('salary')\
    .distinct()\
    .show(20, truncate=False)

                                                                                

+------+
|salary|
+------+
|16250 |
|3650  |
|7650  |
|4850  |
|3200  |
|6900  |
|10250 |
|14000 |
|2700  |
|12750 |
|6850  |
|2550  |
|13700 |
|5450  |
|18750 |
|17100 |
|3250  |
|11600 |
|2200  |
|15100 |
+------+
only showing top 20 rows



In [14]:
result = df \
    .withColumnRenamed("name", "first_name") \
    .withColumnRenamed("phone", "phone_number") \
    .withColumn('registration_date', F.col('registration_date').cast('date')) \
    .withColumn('birthdate', F.col('birthdate').cast('date')) \
    .withColumn('salary', F.col('salary').cast('long'))

result.printSchema()
result \
    .select('salary')\
    .distinct()\
    .show(20, truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- country_code: string (nullable = true)



                                                                                

+------+
|salary|
+------+
|2250  |
|3800  |
|16100 |
|5850  |
|17150 |
|9750  |
|6700  |
|7800  |
|17100 |
|14450 |
|2900  |
|4300  |
|2400  |
|10000 |
|11700 |
|3200  |
|13200 |
|11450 |
|16900 |
|12300 |
+------+
only showing top 20 rows



In [15]:
# result = df \
#     .withColumnRenamed("registration_date", "registered_at") \
#     .withColumnRenamed("birthdate", "dob") \
#     .withColumnRenamed("country_code", "iso_code")

# result.columns


In [16]:
# result.printSchema()

### Select

In [17]:
result\
    .select('registration_date')\
    .distinct()\
    .show(truncate=False)

                                                                                

+-----------------+
|registration_date|
+-----------------+
|2024-08-30       |
|2024-09-03       |
|2024-03-12       |
|2024-05-29       |
|2024-02-23       |
|2024-01-10       |
|2024-04-04       |
|2024-06-01       |
+-----------------+



### GroupBy

In [18]:
result\
    .groupBy('country')\
    .agg(F.count('*').alias('total_rows'))\
    .orderBy(F.col('country').desc())\
    .show(truncate=False)

[Stage 29:>                                                         (0 + 8) / 8]

+------------------------------------+----------+
|country                             |total_rows|
+------------------------------------+----------+
|Zimbabwe                            |14584     |
|Zambia                              |14804     |
|Yemen                               |14737     |
|Western Sahara                      |14732     |
|Wallis and Futuna                   |14573     |
|Vietnam                             |14750     |
|Venezuela                           |14771     |
|Vanuatu                             |14686     |
|Uzbekistan                          |14663     |
|Uruguay                             |14678     |
|United States of America            |14796     |
|United States Virgin Islands        |14683     |
|United States Minor Outlying Islands|14617     |
|United Kingdom                      |14709     |
|United Arab Emirates                |14762     |
|Ukraine                             |14702     |
|Uganda                              |14987     |


                                                                                

In [19]:
result.columns

['first_name',
 'email',
 'address',
 'registration_date',
 'phone_number',
 'company',
 'job',
 'birthdate',
 'country',
 'uuid',
 'salary',
 'country_code']

In [20]:
# Выведите минимальное и максимальное значение salary (max_salary, min_salary)

result\
    .agg(
        F.max('salary').alias('max_salary'),
        F.min('salary').alias('min_salary')
    )\
    .show(truncate=False)

[Stage 31:>                                                         (0 + 8) / 8]

+----------+----------+
|max_salary|min_salary|
+----------+----------+
|20000     |2000      |
+----------+----------+



                                                                                

### Filter

In [21]:
# Два варианта написании фильтрации
df_de_salary = result\
    .where(F.col("country") == "Korea") \
    .where(F.col("salary").isNotNull()) \

df_de_salary2 = result \
    .where('country = "Korea"') \
    .where('salary IS NOT NULL')

print('df_de_salary',df_de_salary.count())
print('df_de_salary2',df_de_salary2.count())
print(df_de_salary.count() == df_de_salary2.count())

                                                                                

df_de_salary 29183


                                                                                

df_de_salary2 29183


[Stage 39:>                                                         (0 + 8) / 8]

True


                                                                                

### Save to CSV

In [22]:
df_de_salary2.printSchema()

root
 |-- first_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- country_code: string (nullable = true)



In [23]:
df_de_salary.show(2, truncate=False)

+-----------------+------------------------+---------------------------------------------+-----------------+-----------------+---------------------------+-----------------------------+----------+-------+------------------------------------+------+------------+
|first_name       |email                   |address                                      |registration_date|phone_number     |company                    |job                          |birthdate |country|uuid                                |salary|country_code|
+-----------------+------------------------+---------------------------------------------+-----------------+-----------------+---------------------------+-----------------------------+----------+-------+------------------------------------+------+------------+
|Rhonda Foley     |jonathan92@example.org  |1234 Cox Light Suite 381, Brownfurt, NV 24776|2024-08-30       |513-361-0719     |White-Oneill               |Academic librarian           |2006-08-07|Korea  |757b9bd8-1aa7-

In [24]:
df_de_salary.columns

['first_name',
 'email',
 'address',
 'registration_date',
 'phone_number',
 'company',
 'job',
 'birthdate',
 'country',
 'uuid',
 'salary',
 'country_code']

In [25]:
final = df_de_salary.select(
    'salary',
    'job',
    'company',
    'first_name',
    'email',
    F.col('registration_date').cast('date').alias('reg_date')
)

final.show(20, truncate=False)


+------+-----------------------------------+-----------------------------+-------------------+--------------------------------+----------+
|salary|job                                |company                      |first_name         |email                           |reg_date  |
+------+-----------------------------------+-----------------------------+-------------------+--------------------------------+----------+
|2300  |Academic librarian                 |White-Oneill                 |Rhonda Foley       |jonathan92@example.org          |2024-08-30|
|14000 |Development worker, community      |Parker, Mcdowell and Moreno  |Christine Hopkins  |davisvanessa@example.com        |2024-09-03|
|19900 |Stage manager                      |Dean-Hubbard                 |Thomas Schultz     |byrdjean@example.org            |2024-08-30|
|8300  |Hydrogeologist                     |Graves Group                 |Marissa Price      |ugates@example.org              |2024-09-03|
|3400  |Dance movement psyc

In [26]:
import shutil

shutil.rmtree('data/final_no_control')
shutil.rmtree('data/final_one_file')
shutil.rmtree('data/final_partitioned')
shutil.rmtree('data/final_partitioned_repart')
shutil.rmtree('data/final_partitioned_coalesce')

In [27]:
# Сохранение неконтроллируемое по кол-ву файлов
final\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_no_control')

partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

                                                                                

Кол-во партиций 8


In [28]:
# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
final\
    .coalesce(1)\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_one_file') 

partition_num = final.coalesce(1).rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

[Stage 44:>                                                         (0 + 1) / 1]

Кол-во партиций 1


                                                                                

In [29]:
# Сохранения с партицированием
final\
    .write\
    .partitionBy('reg_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned')

print_df = final.select('reg_date').distinct()
print(f'Load_date distinct: {print_df.count()}')

[Stage 46:>                                                         (0 + 8) / 8]

Load_date distinct: 8


                                                                                

In [30]:
# Сохранения с партицированием и repartition внутри самой партиции
final\
    .repartition(1, 'reg_date')\
    .write\
    .partitionBy('reg_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned_repart')

partition_num = final.repartition(1, 'reg_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

                                                                                

Кол-во партиций 1


In [31]:
# coalesce VS repartition

final\
    .coalesce(1)\
    .write\
    .partitionBy('reg_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned_coalesce')

partition_num = final.repartition(1, 'reg_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

# repartition - распределяет файлы равномерно. Делает shuffle
# coalesce - объединяет данные как есть

                                                                                

Кол-во партиций 1


### Read Transformed

In [32]:
reader_no_control = spark\
                        .read\
                        .csv('data/final_no_control/', header=True, sep=';')\
                        .where(''' reg_date = "2024-01-10" ''')

reader_final_one_file = spark\
                            .read\
                            .csv('data/final_one_file/', header=True, sep=';')\
                            .where(''' reg_date = "2024-01-10" ''')

reader_partitioned = spark\
                        .read\
                        .csv('data/final_partitioned', header=True, sep=';')\
                        .where(''' reg_date = "2024-01-10" ''')

reader_partitioned_repart = spark\
                                .read\
                                .csv('data/final_partitioned_repart', header=True, sep=';')\
                                .where(''' reg_date = "2024-01-10" ''')

print(reader_no_control.count())
print(reader_final_one_file.count())
print(reader_partitioned.count())
print(reader_partitioned_repart.count())

3598
3598
3598
3598


### JOIN

In [33]:
data = [
    (0,     5000,   "низкая"),
    (5001,  10000,  "средняя"),
    (10001, 20000,  "высокая"),
    (20001, 999999, "очень высокая")
]

salary_levels_df = spark.createDataFrame(data, schema="min_salary long, max_salary long, level string")
salary_levels_df.show()

order_data = spark.read.csv('data/orders.csv', header=True, sep=',', inferSchema=True)

order_data.show(4)

+----------+----------+-------------+
|min_salary|max_salary|        level|
+----------+----------+-------------+
|         0|      5000|       низкая|
|      5001|     10000|      средняя|
|     10001|     20000|      высокая|
|     20001|    999999|очень высокая|
+----------+----------+-------------+



[Stage 69:>                                                         (0 + 8) / 8]

+-----------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+----------+-----------------+--------------------+------+------------+
|             name|               email|             address|  registration_date|               phone|             company|                 job| birthdate|          country|                uuid|salary|country_code|
+-----------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+----------+-----------------+--------------------+------+------------+
|Jermaine Espinoza|  fbaker@example.org|634 Gonzalez Inle...|2024-05-31 21:00:00| +1-605-744-8478x494|           Baker Ltd| Broadcast presenter|1985-01-12|       Azerbaijan|9c482b2e-b191-446...| 10350|          HT|
|    Lauren Thomas|alfredwaters@exam...|26631 Jill Juncti...|2024-03-11 21:00:00|        722.209.9393|   Castillo-Valencia|      Dramatherap

                                                                                

In [34]:
# Отключим автоматический Broadcast JOIN
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [35]:
# Замерим выполнение запроса без broadcast join

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

start_time = time.time()

order_data = order_data.withColumn("salary", F.col("salary").cast("long"))

joined = order_data.join(
    salary_levels_df,
    (order_data.salary >= salary_levels_df.min_salary) & (order_data.salary <= salary_levels_df.max_salary),
    how="left"
)

joined.count()
end_time = time.time()
print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")
joined.show()

                                                                                

Elapsed time for join operation: 0.89 seconds
+-----------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+------+------------+----------+----------+-------+
|             name|               email|             address|  registration_date|               phone|             company|                 job| birthdate|             country|                uuid|salary|country_code|min_salary|max_salary|  level|
+-----------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+------+------------+----------+----------+-------+
|Jermaine Espinoza|  fbaker@example.org|634 Gonzalez Inle...|2024-05-31 21:00:00| +1-605-744-8478x494|           Baker Ltd| Broadcast presenter|1985-01-12|          Azerbaijan|9c482b2e-b191-446...| 10350|      

In [36]:
# Замерим выполнение запроса c broadcast join

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

start_time = time.time()

order_data = order_data.withColumn("salary", F.col("salary").cast("long"))

joined = order_data.join(
    F.broadcast(salary_levels_df),
    (order_data.salary >= salary_levels_df.min_salary) & (order_data.salary <= salary_levels_df.max_salary),
    how="left"
)

joined.count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")


[Stage 77:>                                                         (0 + 8) / 8]

Elapsed time for broadcast join operation: 2.20 seconds


                                                                                

### Cache | Persist

In [37]:
order_data.cache().count()

                                                                                

3600000

In [38]:
order_data.unpersist()

DataFrame[name: string, email: string, address: string, registration_date: timestamp, phone: string, company: string, job: string, birthdate: string, country: string, uuid: string, salary: bigint, country_code: string]

In [39]:
from pyspark.storagelevel import StorageLevel

order_data.persist(StorageLevel.DISK_ONLY).count()

                                                                                

3600000

### Repartition & Coalesce

In [40]:
data = [(1,'one'), (2,'two'), (3,'three'), (4,'four'),
        (5,'five'), (6,'six'), (7, 'seven'), (8, 'eight'),
        (9, 'nine')]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

+---+------+
| id|number|
+---+------+
|  1|   one|
|  2|   two|
|  3| three|
|  4|  four|
|  5|  five|
|  6|   six|
|  7| seven|
|  8| eight|
|  9|  nine|
+---+------+



In [41]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(4)
mix.rdd.glom().collect()

[[Row(id=9, number='nine')],
 [],
 [],
 [Row(id=3, number='three'),
  Row(id=7, number='seven'),
  Row(id=5, number='five'),
  Row(id=2, number='two'),
  Row(id=1, number='one'),
  Row(id=6, number='six'),
  Row(id=4, number='four'),
  Row(id=8, number='eight')]]

In [42]:
mix.repartition(3).rdd.glom().collect()

[[Row(id=4, number='four'),
  Row(id=9, number='nine'),
  Row(id=5, number='five'),
  Row(id=6, number='six')],
 [Row(id=2, number='two'), Row(id=1, number='one')],
 [Row(id=3, number='three'),
  Row(id=7, number='seven'),
  Row(id=8, number='eight')]]

In [43]:
mix.coalesce(3).rdd.glom().collect()

[[Row(id=9, number='nine')],
 [],
 [Row(id=3, number='three'),
  Row(id=7, number='seven'),
  Row(id=2, number='two'),
  Row(id=6, number='six'),
  Row(id=4, number='four'),
  Row(id=8, number='eight'),
  Row(id=1, number='one'),
  Row(id=5, number='five')]]

In [44]:
# mix.toPandas().head()

In [45]:
# OUT OF MEMORY

d = spark.read.csv('data/orders.csv', header=True, sep='\t')
d.collect()

                                                                                

[Row(name,email,address,registration_date,phone,company,job,birthdate,country,uuid,salary,country_code='Jermaine Espinoza,fbaker@example.org,"634 Gonzalez Inlet Suite 238, South Trevormouth, VA 19774",2024-06-01T00:00:00.000+03:00,+1-605-744-8478x494,Baker Ltd,Broadcast presenter,1985-01-12,Azerbaijan,9c482b2e-b191-4462-bee4-b5dda215e792,10350,HT'),
 Row(name,email,address,registration_date,phone,company,job,birthdate,country,uuid,salary,country_code='Lauren Thomas,alfredwaters@example.com,"26631 Jill Junction, Sandersborough, GA 60912",2024-03-12T00:00:00.000+03:00,722.209.9393,Castillo-Valencia,Dramatherapist,1965-10-31,Barbados,ca0d3252-e65c-43aa-b557-8d5efe5f60bb,8000,TF'),
 Row(name,email,address,registration_date,phone,company,job,birthdate,country,uuid,salary,country_code='James Gutierrez,carmen78@example.com,"01077 Wilson Viaduct, West Heather, NV 03145",2024-09-03T00:00:00.000+03:00,001-458-508-9427x3274,"Dean, Hall and Simmons","Development worker, community",1993-10-28,Kazak

In [46]:
dl = list()

In [None]:
try:
    while True:
        dl.append(spark.read.csv('data/orders.csv', header=True, sep='\t').collect())
finally:
    print(len(dl))

                                                                                

In [None]:
# spark.stop()

In [None]:
# import os
# import sys

# sys.executable