### 1. Формирование CSV-файлов 
По данным из Elasticsearch сформировать csv-файлы (с внутренней схемой) таблиц «Пассажир», «Билет», «Поезд» и сохранить их в файловой системе HDFS. 

In [1]:
from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch
from pyspark.sql.types import *
import findspark
findspark.init()

In [2]:
client = Elasticsearch("http://localhost:9200")
client.ping()

True

In [3]:
sparkSession = SparkSession.builder.appName("csv4").getOrCreate()

24/05/05 21:06:06 WARN Utils: Your hostname, timerlan-IdeaPad-Gaming-3-15IMH05 resolves to a loopback address: 127.0.1.1; using 192.168.14.252 instead (on interface enp12s0)
24/05/05 21:06:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/05 21:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
search_query = {
    "match_all": {}
}
trains = client.search(index="train", query=search_query, size=100)
trains = trains['hits']['hits']
trains[0]

{'_index': 'train',
 '_id': '1',
 '_score': 1.0,
 '_source': {'train_number': 856234,
  'train_info': 'люкс плацкарт св 2011-01-01T12:34 2011-01-03T22:55',
  'departure': 'Галич',
  'arrive': 'Кировск (Ленин.)',
  'tickets_sold': 3,
  'tickets_remain': 225}}

In [5]:
tickets = client.search(index="ticket", query=search_query, size=100)
tickets = tickets['hits']['hits']
tickets[0]

{'_index': 'ticket',
 '_id': '1',
 '_score': 1.0,
 '_source': {'user_id': 'd54eb390-46b3-47ff-aeaa-96879e3fbb92',
  'personal': 'Галина Валентиновна Николаева',
  'date_purchase': '2015-05-25T19:47:14',
  'price': 15444.95,
  'train_id': 2}}

In [18]:
TrainSchema = StructType([
    StructField("train_id", IntegerType(), False),
    StructField("train_number", IntegerType(), False),
    StructField("train_info", StringType(), False),
    StructField("departure", StringType(), False),
    StructField("arrive", StringType(), False),
    StructField("tickets_sold", IntegerType(), False),
    StructField("tickets_remain", IntegerType(), False),
])

TicketSchema = StructType([
    StructField("ticket_id", IntegerType(), False),
    StructField("user_id", StringType(), False),
    StructField("train_id", IntegerType(), False),
    StructField("date_purchase_str", StringType(), False),
    StructField("price", FloatType(), False),
])

PassengerSchema = StructType([
    StructField("user_id", StringType(), False),
    StructField("personal", StringType(), False),
])

TrainTab = []
TicketTab = []
PassengerTab = []

In [19]:
for train in trains:
    TrainTab.append((
        int(train['_id']),
        train['_source']['train_number'],
        train['_source']['train_info'],
        train['_source']['departure'],
        train['_source']['arrive'],
        train['_source']['tickets_sold'],
        train['_source']['tickets_remain'],
    ))
TrainDF = sparkSession.createDataFrame(TrainTab, TrainSchema)
TrainDF.show(5)

+--------+------------+--------------------+----------+----------------+------------+--------------+
|train_id|train_number|          train_info| departure|          arrive|tickets_sold|tickets_remain|
+--------+------------+--------------------+----------+----------------+------------+--------------+
|       1|      856234|люкс плацкарт св ...|     Галич|Кировск (Ленин.)|           3|           225|
|       2|      305824|плацкарт купе 201...| Калачинск|      Белокуриха|           5|           937|
|       3|      646593|купе плацкарт св ...|  Объячево|          Терней|           1|           467|
|       4|      805760|св люкс купе плац...|   Чусовой|          Котлас|           2|           614|
|       5|      230075|купе св 2019-10-2...|Лабытнанги|          Брянск|           1|           673|
+--------+------------+--------------------+----------+----------------+------------+--------------+
only showing top 5 rows



In [24]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col

In [29]:
userSet = set()
for ticket in tickets:
    TicketTab.append((
        int(ticket['_id']),
        ticket['_source']['user_id'],
        ticket['_source']['train_id'],
        ticket['_source']['date_purchase'],
        ticket['_source']['price'],
    ))
    if ticket['_source']['user_id'] not in userSet:
        PassengerTab.append((
            ticket['_source']['user_id'],
            ticket['_source']['personal'],
        ))
        userSet.add(ticket['_source']['user_id'])
TicketDF = sparkSession.createDataFrame(TicketTab, TicketSchema)
# Преобразуем строку со временем к типу Timestamp
TicketDF = TicketDF.withColumn("date_purchase", to_timestamp("date_purchase_str"))
TicketDF = TicketDF.drop("date_purchase_str")
TicketDF.printSchema()
TicketDF.show(5)

root
 |-- ticket_id: integer (nullable = false)
 |-- user_id: string (nullable = false)
 |-- train_id: integer (nullable = false)
 |-- price: float (nullable = false)
 |-- date_purchase: timestamp (nullable = true)

+---------+--------------------+--------+--------+-------------------+
|ticket_id|             user_id|train_id|   price|      date_purchase|
+---------+--------------------+--------+--------+-------------------+
|        1|d54eb390-46b3-47f...|       2|15444.95|2015-05-25 19:47:14|
|        2|7eb10bc3-88f8-49b...|      20| 3172.69|2011-11-16 15:12:41|
|        3|3e01f5b5-bfe9-4f2...|       5|11806.73|2019-10-18 11:55:25|
|        4|ed8df23b-0432-4fe...|       2| 1047.43|2015-06-16 18:47:37|
|        5|ed8df23b-0432-4fe...|       2| 2063.12|2015-06-16 18:47:37|
+---------+--------------------+--------+--------+-------------------+
only showing top 5 rows



In [10]:
PassengerDF = sparkSession.createDataFrame(PassengerTab, PassengerSchema)
PassengerDF.show(5)

+--------------------+--------------------+
|             user_id|            personal|
+--------------------+--------------------+
|d54eb390-46b3-47f...|Галина Валентинов...|
|7eb10bc3-88f8-49b...|Белозерова Фёкла ...|
|3e01f5b5-bfe9-4f2...|Милий Анисимович ...|
|ed8df23b-0432-4fe...|Феоктист Трофимов...|
|a94a30eb-a111-421...|Никонова Светлана...|
+--------------------+--------------------+
only showing top 5 rows



In [11]:
from hdfs import InsecureClient

In [12]:
client = InsecureClient('http://localhost:9000', user='hduser')

In [17]:
TrainDF.write.csv(path='hdfs://localhost:9000/db-cw/trains.csv', mode='overwrite', header=True)
TicketDF.write.csv(path='hdfs://localhost:9000/db-cw/tickets.csv',mode='overwrite', header=True)
PassengerDF.write.csv(path='hdfs://localhost:9000/db-cw/passengers.csv',mode='overwrite', header=True)

                                                                                

In [None]:
sparkSession.stop()