In [1]:
from pyspark.sql import SparkSession
import os

os.environ['HADOOP_USER_NAME'] = 'n.almazova'

spark = SparkSession.builder \
    .appName('HDFS to data') \
    .master('local[1]') \
    .config('spark.hadoop.fs.defaultFS', f'hdfs://172.17.0.23/user/live_project_b/data')\
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x000001EBADDEF340>


# Чистим данные

### Обработка clients

In [26]:
clients = spark.read.option("multiline", "true")\
    .csv(f"hdfs://172.17.0.23/user/live_project_b/data/clients", header=True, inferSchema=True)

In [27]:
from pyspark.sql.functions import regexp_replace

clean_clients = clients.withColumn('client_address', regexp_replace('client_address', '\n', ' '))


In [28]:
clean_clients.show(truncate=False)

+---------+--------------------+--------------------------+----------------------+-------------------------------------------------------+
|client_id|client_name         |client_email              |client_phone          |client_address                                         |
+---------+--------------------+--------------------------+----------------------+-------------------------------------------------------+
|1        |Tracy Duncan        |eduardoparsons@example.net|385-842-7092          |019 Powers Estate Jessicachester, LA 32674             |
|2        |Amy Phillips        |richardwalker@example.net |001-282-240-3733x3518 |32056 Todd Roads Port Calvinville, SC 98668            |
|3        |Christopher Buchanan|lopezandrea@example.com   |338.327.1666x8437     |768 Davis Wall Apt. 641 East Kimberly, VI 79720        |
|4        |Tyler Johnston      |kevin07@example.net       |001-829-581-6480x690  |Unit 4958 Box 7720 DPO AE 80388                        |
|5        |Gregory Jones   

In [29]:
clean_clients.write.format('csv').option('header', 'true').save('clients') 

### Обработка clients_activities

In [7]:
clients_activities = spark.read.option("multiLine", "true")\
    .csv(f"hdfs://172.17.0.23/user/live_project_b/data/clients_activities", header=True, inferSchema=True)

clients_activities.show(truncate=False)


+---------+--------------------------+--------------+--------------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------+
|client_id|activity_date             |activity_type |activity_location         |ip_address    |device                                                                                                                              |
+---------+--------------------------+--------------+--------------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------+
|1        |2024-08-26 00:10:31.530598|pay_bill      |posts                     |12.252.118.134|Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8 rv:3.0; bn-BD) AppleWebKit/533.42.3 (KHTML, like Gecko) Version/5.1 Safari/533.42.3|
|1        |2024-07-29 19:41:14.530598|view_account  |tags/posts                |12.2

In [8]:
clients_activities.write.format('csv').option('header', 'true').save('clients_activities') 

### Обработка clients_calls_support

In [32]:
clients_calls_support = spark.read.option("multiLine", "true")\
    .csv(f"hdfs://172.17.0.23/user/live_project_b/data/clients_calls_support", header=True, inferSchema=True)

clients_calls_support.show(truncate=False)


+---------+--------------------------+--------+----------+
|client_id|call_date                 |duration|result    |
+---------+--------------------------+--------+----------+
|1        |2024-08-18 05:05:34.530598|1750    |resolved  |
|1        |2024-10-31 00:01:58.530598|88      |unresolved|
|1        |2024-09-30 07:23:00.530598|1303    |unresolved|
|1        |2024-10-23 10:08:29.530598|1242    |unresolved|
|1        |2024-10-09 23:48:39.530598|1102    |resolved  |
|1        |2024-07-22 05:08:03.530598|1399    |unresolved|
|1        |2024-08-01 16:53:18.530598|1149    |resolved  |
|1        |2024-10-26 12:54:05.530598|1027    |unresolved|
|1        |2024-07-12 21:28:53.530598|265     |unresolved|
|1        |2024-09-27 07:05:59.530598|965     |resolved  |
|1        |2024-10-16 12:49:46.530598|1243    |unresolved|
|1        |2024-10-16 04:30:27.530598|1105    |unresolved|
|1        |2024-09-14 07:32:51.530598|1675    |unresolved|
|1        |2024-09-15 16:26:05.530598|177     |resolved 

In [33]:
from pyspark.sql.functions import when, col

clean_clients_calls_support = clients_calls_support.withColumn("result", when(col("result") == "resolved", True).otherwise(False))

# Показать измененный DataFrame
clean_clients_calls_support.show()

+---------+--------------------+--------+------+
|client_id|           call_date|duration|result|
+---------+--------------------+--------+------+
|        1|2024-08-18 05:05:...|    1750|  true|
|        1|2024-10-31 00:01:...|      88| false|
|        1|2024-09-30 07:23:...|    1303| false|
|        1|2024-10-23 10:08:...|    1242| false|
|        1|2024-10-09 23:48:...|    1102|  true|
|        1|2024-07-22 05:08:...|    1399| false|
|        1|2024-08-01 16:53:...|    1149|  true|
|        1|2024-10-26 12:54:...|    1027| false|
|        1|2024-07-12 21:28:...|     265| false|
|        1|2024-09-27 07:05:...|     965|  true|
|        1|2024-10-16 12:49:...|    1243| false|
|        1|2024-10-16 04:30:...|    1105| false|
|        1|2024-09-14 07:32:...|    1675| false|
|        1|2024-09-15 16:26:...|     177|  true|
|        1|2024-09-24 08:23:...|     922| false|
|        1|2024-10-15 21:05:...|    1665| false|
|        1|2024-10-04 21:12:...|    1411|  true|
|        1|2024-09-2

In [31]:
clean_clients_calls_support.printSchema()

root
 |-- client_id: integer (nullable = true)
 |-- call_date: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- result: boolean (nullable = false)



In [34]:
clean_clients_calls_support.write.format('csv').option('header', 'true').save('clients_calls_support') 

### Обработка clients_logins

In [35]:
clients_logins = spark.read.option("header", "true")\
    .csv(f"hdfs://172.17.0.23/user/live_project_b/data/clients_logins", header=True, inferSchema=True, multiLine=True)

clients_logins.show(truncate=False)


+---------+--------------------------+---------------+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|client_id|login_date                |ip_address     |location                                |device                                                                                                                                            |
+---------+--------------------------+---------------+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|1        |2024-02-16 06:51:13.530598|12.252.118.134 |-55.9413759087795, -14.385552165217376  |Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8 rv:3.0; bn-BD) AppleWebKit/533.42.3 (KHTML, like Gecko) Version/5.1 Safari/533.42.3              |
|1        |2024-10-21 20:01:

In [36]:
clients_logins.write.format('csv').option('header', 'true').save('clients_logins') 

### Обработка clients_transactions

In [10]:
clients_transactions = spark.read.option("header", "true")\
    .csv(f"hdfs://172.17.0.23/user/live_project_b/data/clients_transactions", header=True, inferSchema=True, multiLine=True)

clients_transactions.show(truncate=False)

+---------+--------------+--------------------------+----------------+----------------------+--------+---------+
|client_id|transaction_id|transaction_date          |transaction_type|account_number        |currency|amount   |
+---------+--------------+--------------------------+----------------+----------------------+--------+---------+
|1        |234609882     |2024-08-26 09:17:32.530598|pay_bill        |GB28KEXZ85954763023129|USD     |512.34   |
|1        |860612867     |2024-09-23 21:55:51.530598|transfer_funds  |GB47EHXT96036734057340|USD     |3458.34  |
|1        |190083883     |2024-10-23 00:52:23.530598|pay_bill        |GB79WXIO00672741168285|RUB     |238213.27|
|1        |607049149     |2024-10-23 01:48:59.530598|transfer_funds  |GB14IJCQ22626781259740|RUB     |154073.8 |
|1        |616098761     |2024-10-23 01:05:33.530598|pay_bill        |GB02SMPA83475471086901|RUB     |8405356.1|
|1        |400510704     |2024-10-24 01:51:42.530598|pay_bill        |GB79QJMV47644080099275|USD

In [12]:
clients_transactions.count()

288569

In [23]:
clean_clients_transaction = clients_transactions.drop_duplicates(['transaction_id'])
clean_clients_transaction = clean_clients_transaction.repartition(1)

In [24]:
clean_clients_transaction.count()

288529

In [48]:
clean_clients_transaction.show()

+---------+--------------+--------------------+----------------+--------------------+--------+---------+
|client_id|transaction_id|    transaction_date|transaction_type|      account_number|currency|   amount|
+---------+--------------+--------------------+----------------+--------------------+--------+---------+
|     2304|         74775|2024-09-11 19:33:...|  transfer_funds|GB30TFCV796683935...|     RUB|612558.39|
|      996|        239284|2024-09-06 12:56:...|  transfer_funds|GB28EZIZ826251360...|     USD|   563.49|
|     2843|        459259|2024-11-02 03:22:...|  transfer_funds|GB63XOPY227969465...|     RUB|476746.78|
|     2403|        574221|2024-08-30 03:18:...|  transfer_funds|GB09NPTZ672399413...|     USD|   3487.5|
|     1651|       1190318|2024-05-10 02:57:...|  transfer_funds|GB92EZZW959970197...|     USD|  8677.24|
|      211|       1231231|2024-10-20 12:59:...|  transfer_funds|GB41TFEV245616773...|     USD|  1283.02|
|     2620|       1349953|2024-09-20 20:54:...|  transf

In [25]:
clean_clients_transaction.write.format('csv').option('header', 'true').save('clients_transactions') 

### Обработка clients_payments

In [15]:
clients_payments = spark.read.option("header", "true")\
    .csv(f"hdfs://172.17.0.23/user/live_project_b/data/clients_payments", header=True, inferSchema=True, multiLine=True)

clients_payments.show(truncate=False)

+---------+----------+--------------------------+--------+---------+--------------+--------------+
|client_id|payment_id|payment_date              |currency|amount   |payment_method|transaction_id|
+---------+----------+--------------------------+--------+---------+--------------+--------------+
|1        |392158235 |2024-08-26 22:14:46.530598|USD     |512.34   |e_wallet      |234609882     |
|1        |996206114 |2024-09-24 19:21:33.530598|USD     |3458.34  |credit_card   |860612867     |
|1        |586726744 |2024-10-23 20:49:19.530598|RUB     |238213.27|debit_card    |190083883     |
|1        |975252897 |2024-10-23 11:35:38.530598|RUB     |154073.8 |bank_transfer |607049149     |
|1        |914550503 |2024-10-24 01:05:31.530598|RUB     |8405356.1|bank_transfer |616098761     |
|1        |713783565 |2024-10-24 06:21:46.530598|USD     |3635.53  |bank_transfer |400510704     |
|1        |821700960 |2024-10-29 10:43:52.530598|USD     |4669.51  |e_wallet      |570370550     |
|1        

In [39]:
clients_payments.createOrReplaceTempView('payments')
clean_clients_transaction.createOrReplaceTempView('transactions')

In [40]:
clean_clients_payments = spark.sql('select * from payments where exists \
                                   (select amount, transaction_id from transactions \
                                   where payments.amount = transactions.amount and payments.transaction_id = transactions.transaction_id)')

In [41]:
clients_payments.count()

288569

In [42]:
clean_clients_payments.count()

288529

In [43]:
clean_clients_payments = clean_clients_payments.dropDuplicates(['payment_id'])

In [44]:
clean_clients_payments.count()

288474

In [45]:
clean_clients_payments = clean_clients_payments.coalesce(1)

In [47]:
clean_clients_payments.write.format('csv').option('header', 'true').save('clients_payments') 

In [44]:
spark.stop()