### Batch mySQL to Spark (1)

In [97]:
import pandas as pd
from pyspark.sql import SparkSession

## Connect to MySQL (Airport)
dataframe_mysql_airport = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://127.0.0.1 /skyscanner_batch")\
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "airports").option("user", "root")\
.option("password", "").load()

## Connect to MySQL (Cities)
dataframe_mysql_cities = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://127.0.0.1 /skyscanner_batch")\
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "cities").option("user", "root")\
.option("password", "").load()


In [114]:
#Airport Table
AirportDF =  dataframe_mysql_airport.toPandas()
AirportJSON = AirportDF.to_json(orient="records")

#Cities Data
CitiesDF =  dataframe_mysql_cities.toPandas()
CitiesJSON = CitiesDF.to_json(orient="records")


### Batch to Kafka Producer (Don't Need it)

In [1]:
# import kafka
# from kafka import KafkaProducer

# ## Connect to Producer
# producer = KafkaProducer(bootstrap_servers='localhost:9092')

# lines = [x.strip() for x in AirportDF]

# ## Send Airports table to Topic 'airports'
# for line in lines:
#     future = producer.send('airports', bytes(line, encoding= 'utf-8'))
#     producer.flush()


In [2]:
# lines2 = [n.strip() for n in CitiesDF]

# ## Send Cities table to Topic 'cities'
# for line2 in lines2:
#     future = producer.send('cities', bytes(line2, encoding= 'utf-8'))
#     producer.flush()


### Elasticsearch Index Creation for SQL Data  (2)

In [100]:
from elasticsearch import Elasticsearch

es = Elasticsearch('http://localhost:9200/')
es.indices.create(index="airports_api", ignore=400)

TypeError: 'Elasticsearch' object is not iterable

In [105]:
es2 = Elasticsearch('http://localhost:9200/')
es2.indices.create(index="cities_api", ignore=400)

{'error': {'root_cause': [{'type': 'resource_already_exists_exception',
    'reason': 'index [cities_api/760FkXplR_2xLGpau2kcAw] already exists',
    'index_uuid': '760FkXplR_2xLGpau2kcAw',
    'index': 'cities_api'}],
  'type': 'resource_already_exists_exception',
  'reason': 'index [cities_api/760FkXplR_2xLGpau2kcAw] already exists',
  'index_uuid': '760FkXplR_2xLGpau2kcAw',
  'index': 'cities_api'},
 'status': 400}

### Parse Data to Index in ES (3)

In [96]:
import json

for i, item in AirportDF.iterrows():
    es.index(index="airports_api", 
         doc_type = "json" , 
         body = {"IATA":item['IATA'],"Airport_Name":item['Airport_Name'],"City_ID":item['City_ID']})

In [113]:
for x, items in CitiesDF.iterrows():
    es2.index(index="cities_api", 
         doc_type = "json", 
         body = {"City_ID":items['City_ID'],"City_Name":items[' City_Name'],"Country_Name":items[' Country_Name'],"Country_Code":items[' Country_Code']})

City_ID                    1
 City_Name             Assen
 Country_Name    Netherlands
 Country_Code             NL
Name: 0, dtype: object
City_ID                    2
 City_Name         Coevorden
 Country_Name    Netherlands
 Country_Code             NL
Name: 1, dtype: object
City_ID                    3
 City_Name             Emmen
 Country_Name    Netherlands
 Country_Code             NL
Name: 2, dtype: object
City_ID                    4
 City_Name         Hoogeveen
 Country_Name    Netherlands
 Country_Code             NL
Name: 3, dtype: object
City_ID                    5
 City_Name            Meppel
 Country_Name    Netherlands
 Country_Code             NL
Name: 4, dtype: object
City_ID                    6
 City_Name            Almere
 Country_Name    Netherlands
 Country_Code             NL
Name: 5, dtype: object
City_ID                      7
 City_Name       Biddinghuizen
 Country_Name      Netherlands
 Country_Code               NL
Name: 6, dtype: object
City_ID            

City_ID                   60
 City_Name          Montfort
 Country_Name    Netherlands
 Country_Code             NL
Name: 59, dtype: object
City_ID                   61
 City_Name        Nieuwstadt
 Country_Name    Netherlands
 Country_Code             NL
Name: 60, dtype: object
City_ID                   62
 City_Name         Roermond 
 Country_Name    Netherlands
 Country_Code             NL
Name: 61, dtype: object
City_ID                     63
 City_Name       Schin op Geul
 Country_Name      Netherlands
 Country_Code               NL
Name: 62, dtype: object
City_ID                   64
 City_Name           Sittard
 Country_Name    Netherlands
 Country_Code             NL
Name: 63, dtype: object
City_ID                   65
 City_Name             Stein
 Country_Name    Netherlands
 Country_Code             NL
Name: 64, dtype: object
City_ID                   66
 City_Name          Susteren
 Country_Name    Netherlands
 Country_Code             NL
Name: 65, dtype: object
City_ID     

City_ID                  118
 City_Name         Enschede 
 Country_Name    Netherlands
 Country_Code             NL
Name: 117, dtype: object
City_ID                  119
 City_Name        Genemuiden
 Country_Name    Netherlands
 Country_Code             NL
Name: 118, dtype: object
City_ID                  120
 City_Name           Hasselt
 Country_Name    Netherlands
 Country_Code             NL
Name: 119, dtype: object
City_ID                  121
 City_Name           Hengelo
 Country_Name    Netherlands
 Country_Code             NL
Name: 120, dtype: object
City_ID                  122
 City_Name            Kampen
 Country_Name    Netherlands
 Country_Code             NL
Name: 121, dtype: object
City_ID                  123
 City_Name         Oldenzaal
 Country_Name    Netherlands
 Country_Code             NL
Name: 122, dtype: object
City_ID                  124
 City_Name         Steenwijk
 Country_Name    Netherlands
 Country_Code             NL
Name: 123, dtype: object
City_ID      

### Pyspark do here????

In [3]:
from kafka import KafkaConsumer
import pandas

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
consumer.subscribe(['crowdedness'])



crowdedness = [msg.value.decode('utf-8') for msg in consumer]
print(crowdedness)

['', '', '', '', '[{"Flight_ID": "0", "Flight_NR": "1", "Departure_Date": "2252", "Departure_Time": "25-11-2018", "Departure_Airport_ID": "3:40:00 AM", "Arrival_Date": "EIN", "Arrival_Time": "25-11-2018", "Destination_Airport_ID": "5:30:00 AM", "Airline": "LEY", "Price": "Atlantic Airways", "Delay": "746", "Cancellation": "FALSE"}, {"Flight_ID": "1", "Flight_NR": "2", "Departure_Date": "3028", "Departure_Time": "28-11-2018", "Departure_Airport_ID": "4:00:00 AM", "Arrival_Date": "AMS", "Arrival_Time": "28-11-2018", "Destination_Airport_ID": "5:00:00 AM", "Airline": "RTM", "Price": "Atlantic Airways", "Delay": "497", "Cancellation": "FALSE"}, {"Flight_ID": "2", "Flight_NR": "3", "Departure_Date": "6200", "Departure_Time": "30-11-2018", "Departure_Airport_ID": "7:40:00 PM", "Arrival_Date": "ENS", "Arrival_Time": "30-11-2018", "Destination_Airport_ID": "8:30:00 PM", "Airline": "RTM", "Price": "Atlantic Airways", "Delay": "505", "Cancellation": "TRUE"}, {"Flight_ID": "3", "Flight_NR": "4", 