# Routes.dat



    Airline --> 2-letter (IATA) or 3-letter (ICAO) code of the airline.
    Airline ID --> Unique OpenFlights identifier for airline (see Airline).
    Source airport --> 3-letter (IATA) or 4-letter (ICAO) code of the source airport.
    Source airport ID --> Unique OpenFlights identifier for source airport (see Airport)
    Destination airport --> 3-letter (IATA) or 4-letter (ICAO) code of the destination airport.
    Destination airport ID --> Unique OpenFlights identifier for destination airport (see Airport)
    Codeshare --> "Y" if this flight is a codeshare (that is, not operated by Airline, but another carrier), empty otherwise.
    Stops --> Number of stops on this flight ("0" for direct)
    Equipment --> 3-letter codes for plane type(s) generally used on this flight, separated by spaces

    The data is UTF-8 encoded. The special value \N is used for "NULL" to indicate that no value is available, and is understood automatically by MySQL if imported.




In [3]:
%%writefile spark_analysis.py

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()

BUCKET = args.bucket

Overwriting spark_analysis.py


In [4]:
%%writefile -a spark_analysis.py
from pyspark.sql import SparkSession, SQLContext, Row
spark = SparkSession.builder.appName("DataExploring").getOrCreate()
sc = spark.sparkContext

data_file = "gs://{}/routes.dat".format(BUCKET)

routes = sc.textFile(data_file).cache()

routes.take(5)

Appending to spark_analysis.py


In [7]:
%%writefile -a spark_analysis.py
routes_split = routes.map(lambda row : row.split(","))
parsed_routes = routes_split.map(lambda r : Row(
    airline = str(r[0]),
    airline_ID = r[1],
    source_airport = r[2],
    source_airport_ID = r[3],
    destination_airport = r[4],
    destination_airport_ID = r[5],
    codeshare = r[6],
    stops = r[7],
    equipment = r[8]
    )
)

parsed_routes.take(5)

Appending to spark_analysis.py


In [None]:
# airline_ID, destination_airport_ID, source_airport_ID

In [8]:
%%writefile -a spark_analysis.py
# parse edilmis dat dosyasını dataframe'e cevirme 
sqlContext = SQLContext(sc)
routes_df = sqlContext.createDataFrame(parsed_routes)
routes_df.show(5)

Appending to spark_analysis.py


In [9]:
%%writefile -a spark_analysis.py
routes_df.createTempView("routes")

Appending to spark_analysis.py


In [10]:
%%writefile -a spark_analysis.py
query = spark.sql("""
              SELECT 
                  airline,
                  destination_airport,
                  COUNT(destination_airport) as Total_Destination_Flight
              FROM routes
              GROUP BY destination_airport, airline
              ORDER BY Total_Destination_Flight DESC
              LIMIT 20
              """)
query.show()

Appending to spark_analysis.py


In [11]:
%%writefile -a spark_analysis.py
ax = query.sort("Total_Destination_Flight").toPandas().plot.barh(x='airline', figsize=(10,10))

Appending to spark_analysis.py


In [12]:
%%writefile -a spark_analysis.py
ax.get_figure().savefig('report_routes.png');

Appending to spark_analysis.py


## White House Visitor Records


    UIN - Appointment Number 
    BDG NBR – Badge Number
    Access Type - Type of access to the complex (VA = Visitor Access)
    TOA – Time of Arrival
    POA –  Post of Arrival
    TOD – Time of Departure 
    POD – Post of Departure
    APPT_MADE_DATE – Date the Appointment was made.
    APPT_START_DATE – Date and time for which the appointment was scheduled
    APPT_END_DATE – Date and time for which the appointment was scheduled to end
    APPT_CANCEL_DATE – Date the appointment was canceled, if applicable
    Total_People- The total number of people scheduled for a particular appointment per requestor
    LAST_UPDATEDBY – Identifier of officer that updated record
    POST – Computer used to enter appointment
    LastEntryDate – Most recent update to appointment
    TERMINAL_SUFFIX - Identifier of officer that entered appointment
    visitee_namelast – Last name of the visitee
    visitee_namefirst – First name of the visitee
    MEETING_LOC – Building in which meeting was scheduled
    MEETING_ROOM – Room in which meeting was scheduled
    CALLER_NAME_LAST – Last name of the individual that submitted the WAVES request
    CALLER_NAME_FIRST – First name of the individual that submitted the WAVES request
    CALLER_ROOM – Room from which the appointment was made 
    
    Amac beyaz saraya gelen ziyaretciler en çok kime gelmişler onu bulmak.
    POTUS geçen bir kolon var president of United States kısaltması o kacıncı kolon ise o kolon kime gelindiğini gösteriyor.


In [13]:
%%writefile -a spark_analysis.py
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType


path = "gs://{}/whitehouse-waves-2014_03.csv".format(BUCKET)
    
white_house_visitor_record = spark.read.option("header", True).csv(path)
white_house_visitor_record = white_house_visitor_record.withColumn("Total_People", col("Total_People").cast(IntegerType())) \
                             .withColumn("APPT_MADE_DATE", col("APPT_MADE_DATE").cast(DateType())) \
                             .withColumn("APPT_START_DATE", col("APPT_START_DATE").cast(DateType())) \
                             .withColumn("APPT_END_DATE", col("APPT_END_DATE").cast(DateType())) \
                             .withColumn("APPT_CANCEL_DATE", col("APPT_CANCEL_DATE").cast(DateType())) \
                             .withColumn("LASTENTRYDATE", col("LASTENTRYDATE").cast(DateType())) \
                             .withColumn("RELEASE_DATE", col("RELEASE_DATE").cast(DateType()))

white_house_visitor_record.printSchema()

Appending to spark_analysis.py


In [14]:
%%writefile -a spark_analysis.py
white_house_visitor_record.show(2)

Appending to spark_analysis.py


In [None]:
# visitee_namefirst --> POTUS

In [15]:
%%writefile -a spark_analysis.py
white_house_visitor_record.createTempView("white_house_visitor")
white_house_query = spark.sql("""
                        SELECT 
                            visitee_namefirst ||' '|| visitee_namelast as visite_fullname,
                            COUNT(visitee_namefirst) as Total_Visit
                        FROM white_house_visitor
                        WHERE visitee_namelast != "null" and visitee_namefirst != 'VISITORS'
                        GROUP BY visite_fullname
                        ORDER BY Total_Visit DESC
                        LIMIT 20
                        """)
white_house_query.show()

Appending to spark_analysis.py


In [16]:
%%writefile -a spark_analysis.py
ax = white_house_query.sort("Total_Visit").toPandas().plot.barh(x='visite_fullname', figsize=(10,10))

Appending to spark_analysis.py


In [17]:
%%writefile -a spark_analysis.py
ax.get_figure().savefig('report_white_house_visitors.png');

Appending to spark_analysis.py


In [18]:
%%writefile -a spark_analysis.py

import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparksqlondataproc/'):
    blob.delete()

for fname in ['report_routes.png', 'report_white_house_visitors.png']:
    bucket.blob('sparksqlondataproc/{}'.format(fname)).upload_from_filename(fname)

bucket.blob('sparksqlondataproc/report_routes.png').upload_from_filename('report_routes.png')
bucket.blob('sparksqlondataproc/report_white_house_visitors.png').upload_from_filename('report_white_house_visitors.png')

Appending to spark_analysis.py


In [None]:
#%%writefile -a spark_analysis.py
#
#connections_by_protocol.write.format("csv").mode("overwrite").save(
#    "gs://{}/sparksqlondataproc/connections_by_protocol".format(BUCKET))
#
#connections_by_protocol.write.format("csv").mode("overwrite").save(
#    "gs://{}/sparksqlondataproc/connections_by_protocol".format(BUCKET))

In [20]:
BUCKET= 'de-projects-339514'  # CHANGE
print('Writing to {}'.format(BUCKET))
!python spark_analysis.py --bucket=$BUCKET

Writing to de-projects-339514
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/29 13:36:43 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #0,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
22/01/29 13:36:43 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thr

                                                                                

In [22]:
!gsutil ls gs://$BUCKET/**

gs://de-projects-339514/Project2.ipynb
gs://de-projects-339514/routes.dat
gs://de-projects-339514/sparksqlondataproc/report_routes.png
gs://de-projects-339514/sparksqlondataproc/report_white_house_visitors.png
gs://de-projects-339514/whitehouse-waves-2014_03.csv


In [6]:
!gcloud info --format='value(config.project)'

de-projects-339514


In [23]:
!gsutil cp spark_analysis.py gs://de-projects-339514/sparksqlondataproc/spark_analysis.py

Copying file://spark_analysis.py [Content-Type=text/x-python]...
/ [1 files][  3.7 KiB/  3.7 KiB]                                                
Operation completed over 1 objects/3.7 KiB.                                      
