## Ingest from external source into Google cloud storage buckets

In [1]:
import os
import shutil
import logging
import os.path
import zipfile
import datetime
import tempfile
import urllib.request as urllib2
from google.cloud import storage
from google.cloud.storage import Blob

In [2]:
PROJECT='ticino-2018'
BUCKET_NAME = 'ingres'
PREFIX = 'bts'

---
### Download the file for the given month and year from the BTS site
The ```PARAMS```  string is extracted from a browser request on that site. Described in the GCP Book.

In [3]:
def download(year, month, destdir):
    '''
     Downloads on-time performance data and returns local filename
     YEAR e.g.'2015'
     MONTH e.g. '01 for January
    '''
    logging.info('Requesting data for {}-{}-*'.format(year, month))

    PARAMS="UserTableName=On_Time_Performance&DBShortName=&RawDataTable=T_ONTIME&sqlstr=+SELECT+FL_DATE%2CUNIQUE_CARRIER%2CAIRLINE_ID%2CCARRIER%2CFL_NUM%2CORIGIN_AIRPORT_ID%2CORIGIN_AIRPORT_SEQ_ID%2CORIGIN_CITY_MARKET_ID%2CORIGIN%2CDEST_AIRPORT_ID%2CDEST_AIRPORT_SEQ_ID%2CDEST_CITY_MARKET_ID%2CDEST%2CCRS_DEP_TIME%2CDEP_TIME%2CDEP_DELAY%2CTAXI_OUT%2CWHEELS_OFF%2CWHEELS_ON%2CTAXI_IN%2CCRS_ARR_TIME%2CARR_TIME%2CARR_DELAY%2CCANCELLED%2CCANCELLATION_CODE%2CDIVERTED%2CDISTANCE+FROM++T_ONTIME+WHERE+Month+%3D{1}+AND+YEAR%3D{0}&varlist=FL_DATE%2CUNIQUE_CARRIER%2CAIRLINE_ID%2CCARRIER%2CFL_NUM%2CORIGIN_AIRPORT_ID%2CORIGIN_AIRPORT_SEQ_ID%2CORIGIN_CITY_MARKET_ID%2CORIGIN%2CDEST_AIRPORT_ID%2CDEST_AIRPORT_SEQ_ID%2CDEST_CITY_MARKET_ID%2CDEST%2CCRS_DEP_TIME%2CDEP_TIME%2CDEP_DELAY%2CTAXI_OUT%2CWHEELS_OFF%2CWHEELS_ON%2CTAXI_IN%2CCRS_ARR_TIME%2CARR_TIME%2CARR_DELAY%2CCANCELLED%2CCANCELLATION_CODE%2CDIVERTED%2CDISTANCE&grouplist=&suml=&sumRegion=&filter1=title%3D&filter2=title%3D&geo=All%A0&time=March&timename=Month&GEOGRAPHY=All&XYEAR={0}&FREQUENCY=3&VarDesc=Year&VarType=Num&VarDesc=Quarter&VarType=Num&VarDesc=Month&VarType=Num&VarDesc=DayofMonth&VarType=Num&VarDesc=DayOfWeek&VarType=Num&VarName=FL_DATE&VarDesc=FlightDate&VarType=Char&VarName=UNIQUE_CARRIER&VarDesc=UniqueCarrier&VarType=Char&VarName=AIRLINE_ID&VarDesc=AirlineID&VarType=Num&VarName=CARRIER&VarDesc=Carrier&VarType=Char&VarDesc=TailNum&VarType=Char&VarName=FL_NUM&VarDesc=FlightNum&VarType=Char&VarName=ORIGIN_AIRPORT_ID&VarDesc=OriginAirportID&VarType=Num&VarName=ORIGIN_AIRPORT_SEQ_ID&VarDesc=OriginAirportSeqID&VarType=Num&VarName=ORIGIN_CITY_MARKET_ID&VarDesc=OriginCityMarketID&VarType=Num&VarName=ORIGIN&VarDesc=Origin&VarType=Char&VarDesc=OriginCityName&VarType=Char&VarDesc=OriginState&VarType=Char&VarDesc=OriginStateFips&VarType=Char&VarDesc=OriginStateName&VarType=Char&VarDesc=OriginWac&VarType=Num&VarName=DEST_AIRPORT_ID&VarDesc=DestAirportID&VarType=Num&VarName=DEST_AIRPORT_SEQ_ID&VarDesc=DestAirportSeqID&VarType=Num&VarName=DEST_CITY_MARKET_ID&VarDesc=DestCityMarketID&VarType=Num&VarName=DEST&VarDesc=Dest&VarType=Char&VarDesc=DestCityName&VarType=Char&VarDesc=DestState&VarType=Char&VarDesc=DestStateFips&VarType=Char&VarDesc=DestStateName&VarType=Char&VarDesc=DestWac&VarType=Num&VarName=CRS_DEP_TIME&VarDesc=CRSDepTime&VarType=Char&VarName=DEP_TIME&VarDesc=DepTime&VarType=Char&VarName=DEP_DELAY&VarDesc=DepDelay&VarType=Num&VarDesc=DepDelayMinutes&VarType=Num&VarDesc=DepDel15&VarType=Num&VarDesc=DepartureDelayGroups&VarType=Num&VarDesc=DepTimeBlk&VarType=Char&VarName=TAXI_OUT&VarDesc=TaxiOut&VarType=Num&VarName=WHEELS_OFF&VarDesc=WheelsOff&VarType=Char&VarName=WHEELS_ON&VarDesc=WheelsOn&VarType=Char&VarName=TAXI_IN&VarDesc=TaxiIn&VarType=Num&VarName=CRS_ARR_TIME&VarDesc=CRSArrTime&VarType=Char&VarName=ARR_TIME&VarDesc=ArrTime&VarType=Char&VarName=ARR_DELAY&VarDesc=ArrDelay&VarType=Num&VarDesc=ArrDelayMinutes&VarType=Num&VarDesc=ArrDel15&VarType=Num&VarDesc=ArrivalDelayGroups&VarType=Num&VarDesc=ArrTimeBlk&VarType=Char&VarName=CANCELLED&VarDesc=Cancelled&VarType=Num&VarName=CANCELLATION_CODE&VarDesc=CancellationCode&VarType=Char&VarName=DIVERTED&VarDesc=Diverted&VarType=Num&VarDesc=CRSElapsedTime&VarType=Num&VarDesc=ActualElapsedTime&VarType=Num&VarDesc=AirTime&VarType=Num&VarDesc=Flights&VarType=Num&VarName=DISTANCE&VarDesc=Distance&VarType=Num&VarDesc=DistanceGroup&VarType=Num&VarDesc=CarrierDelay&VarType=Num&VarDesc=WeatherDelay&VarType=Num&VarDesc=NASDelay&VarType=Num&VarDesc=SecurityDelay&VarType=Num&VarDesc=LateAircraftDelay&VarType=Num&VarDesc=FirstDepTime&VarType=Char&VarDesc=TotalAddGTime&VarType=Num&VarDesc=LongestAddGTime&VarType=Num&VarDesc=DivAirportLandings&VarType=Num&VarDesc=DivReachedDest&VarType=Num&VarDesc=DivActualElapsedTime&VarType=Num&VarDesc=DivArrDelay&VarType=Num&VarDesc=DivDistance&VarType=Num&VarDesc=Div1Airport&VarType=Char&VarDesc=Div1AirportID&VarType=Num&VarDesc=Div1AirportSeqID&VarType=Num&VarDesc=Div1WheelsOn&VarType=Char&VarDesc=Div1TotalGTime&VarType=Num&VarDesc=Div1LongestGTime&VarType=Num&VarDesc=Div1WheelsOff&VarType=Char&VarDesc=Div1TailNum&VarType=Char&VarDesc=Div2Airport&VarType=Char&VarDesc=Div2AirportID&VarType=Num&VarDesc=Div2AirportSeqID&VarType=Num&VarDesc=Div2WheelsOn&VarType=Char&VarDesc=Div2TotalGTime&VarType=Num&VarDesc=Div2LongestGTime&VarType=Num&VarDesc=Div2WheelsOff&VarType=Char&VarDesc=Div2TailNum&VarType=Char&VarDesc=Div3Airport&VarType=Char&VarDesc=Div3AirportID&VarType=Num&VarDesc=Div3AirportSeqID&VarType=Num&VarDesc=Div3WheelsOn&VarType=Char&VarDesc=Div3TotalGTime&VarType=Num&VarDesc=Div3LongestGTime&VarType=Num&VarDesc=Div3WheelsOff&VarType=Char&VarDesc=Div3TailNum&VarType=Char&VarDesc=Div4Airport&VarType=Char&VarDesc=Div4AirportID&VarType=Num&VarDesc=Div4AirportSeqID&VarType=Num&VarDesc=Div4WheelsOn&VarType=Char&VarDesc=Div4TotalGTime&VarType=Num&VarDesc=Div4LongestGTime&VarType=Num&VarDesc=Div4WheelsOff&VarType=Char&VarDesc=Div4TailNum&VarType=Char&VarDesc=Div5Airport&VarType=Char&VarDesc=Div5AirportID&VarType=Num&VarDesc=Div5AirportSeqID&VarType=Num&VarDesc=Div5WheelsOn&VarType=Char&VarDesc=Div5TotalGTime&VarType=Num&VarDesc=Div5LongestGTime&VarType=Num&VarDesc=Div5WheelsOff&VarType=Char&VarDesc=Div5TailNum&VarType=Char".format(year, month)
    url='https://www.transtats.bts.gov/DownLoad_Table.asp?Table_ID=236&Has_Group=3&Is_Zipped=0'
    filename = os.path.join(destdir, "{}{}.zip".format(year, month))
    with open(filename, "wb") as fp:
        response = urllib2.urlopen(url, PARAMS.encode('UTF-8'))
        fp.write(response.read())
        logging.debug("{} saved".format(filename))
    return filename

---
### Making sure that the working directory for the download and initial processing is clear:

In [6]:
DESTDIR = "/home/wgiersche/tmp/ingres"
YEAR = '2013'
MONTH = '05'
for f in os.listdir(DESTDIR):
    logging.warning('temp directory {0} not empty, removing {1}'.format(DESTDIR, f))
    os.remove(os.path.join(DESTDIR, f))
os.rmdir(DESTDIR)
os.mkdir(DESTDIR)



---
The download will take a couple of seconds up to a minute. Be patient...

In [7]:
downloaded_file = download(YEAR, MONTH, DESTDIR)

In [8]:
!ls -l /home/wgiersche/tmp/ingres

total 16396
-rw-rw-r-- 1 wgiersche wgiersche 16785734 Jul  7 19:42 201305.zip


---
### Unzip the file

In [9]:
def zip_to_csv(filename, destdir):
    zip_ref = zipfile.ZipFile(filename, 'r')
    cwd = os.getcwd()
    os.chdir(destdir)
    zip_ref.extractall()
    os.chdir(cwd)
    csvfile = os.path.join(destdir, zip_ref.namelist()[0])
    zip_ref.close()
    logging.info("Extracted {}".format(csvfile))
    return csvfile

In [10]:
csvfile = zip_to_csv(downloaded_file, DESTDIR)

In [11]:
!ls -l /home/wgiersche/tmp/ingres

total 108276
-rw-rw-r-- 1 wgiersche wgiersche 16785734 Jul  7 19:42 201305.zip
-rw-rw-r-- 1 wgiersche wgiersche 94084402 Jul  7 19:43 74786420_T_ONTIME.csv


---
### We perform a very first, very basic cleaning step even before uploading

In [12]:
def remove_quotes_comma(csvfile, year, month):
 '''
     returns output_csv_file or raises DataUnavailable exception
 '''
 try:
   outfile = os.path.join(os.path.dirname(csvfile),
                          '{}{}.csv'.format(year, month))
   with open(csvfile, 'r') as infp:
     with open(outfile, 'w') as outfp:
        for line in infp:
           outline = line.rstrip().rstrip(',').replace('"', '')
           outfp.write(outline)
           outfp.write('\n')
   logging.debug('Ingested {} ...'.format(outfile))
   return outfile
 finally:
   logging.debug("... removing {}".format(csvfile))
   os.remove(csvfile)

In [13]:
csvfile = remove_quotes_comma(csvfile, YEAR, MONTH)

In [14]:
!ls -l /home/wgiersche/tmp/ingres

total 94884
-rw-rw-r-- 1 wgiersche wgiersche 80368297 Jul  7 19:43 201305.csv
-rw-rw-r-- 1 wgiersche wgiersche 16785734 Jul  7 19:42 201305.zip


---
### Check whether the file is good, raise Exception if not.

In [15]:
class DataUnavailable(Exception):
   def __init__(self, message):
      self.message = message

class UnexpectedFormat(Exception):
   def __init__(self, message):
      self.message = message

def verify_ingest(csvfile):
   expected_header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE'

   with open(csvfile, 'r') as csvfp:
      firstline = csvfp.readline().strip()
      if (firstline != expected_header):
         os.remove(csvfile)
         msg = 'Got header={}, but expected={}'.format(
                             firstline, expected_header)
         logging.error(msg)
         raise UnexpectedFormat(msg)

      if next(csvfp, None) == None:
         os.remove(csvfile)
         msg = ('Received a file from BTS that has only the header and no content')
         raise DataUnavailable(msg)

In [16]:
verify_ingest(csvfile)

---
### Uploading to cloud storage buckets
We need to have some service account credentials downloaded (here: to /home/wgiersche/.auth/ticino-2018.csv) and refered to by environment variable ```GOOGLE_APPLICATION_CREDENTIALS```
The account must have a role that allows uploading to Google cloud storage. See: 

https://console.cloud.google.com/iam-admin/iam?project=ticino-2018

In [17]:
%%bash
echo $GOOGLE_APPLICATION_CREDENTIALS
cat $GOOGLE_APPLICATION_CREDENTIALS

/home/wgiersche/.auth/ticino-2018.json
{
  "type": "service_account",
  "project_id": "ticino-2018",
  "private_key_id": "947c1a7ebc697d3d6339e197b07dbb09659c534c",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC1Fld6sVxOsJYJ\njk3MqC5jkCEumshWY1EghbggSXsVbBlCceL6ubEbWL1aIf9Quocyz0mRcZUMikUa\nGohQL+45Bo9d7n8Zl2MFIlHTOMD3vfj1sFG97+1Lk9M/dQDvkJFRFwOxbLZfUfhs\nrTg6Q5Iqg1SASF5NMqd7bjbcWPdqmi3oZLFpp/Wrz6waq+70XILVvheqh5yKx8aG\nTTT7wG2W7ZjKAfRi0Myr0wbREVzsCBLBqiFCnRv9+/2X7QBOUWnZ+OcOQSmEOZ5+\nl4WdrYhYbgRCS423KGEOhGXucvvvl5dV2IHWWdW345sUo+gZmKWnGIihJmPOYFPd\nOyE89EJTAgMBAAECggEAPSadReZsyHnjQLjYVtsYRzDds7I7IskJFVVylxyZPuOx\nJ/nzbz2TrCtsTeqbIDTNAQB01HOppIFw0l7RV1rINGOduFeYfhjcLFipjw1kRITX\ndIglImHKHTJ/LLejKEMGf36pFZojf87beD+KdEprKoYM3AEULzeCzTCrv/8fFdUF\n2SzwWm55UYOFGGOvZSAxs081j7eyhsnsZZAhBJ1mnyPvGo5HxqZSPujw2NaNm79P\npL1ri+E3F3lMaz7l4qJlbvSMIqspiKpbAjB0waa6Ndd0Qt1Zi1h1jOLi48BiiAES\nRr/+FswSxAU4SZvvr/D9Z7uTM7a3JWCmossEEGTKYQKBgQD5iFVvngAlB2t1oo

In [18]:
def upload(csvfile, bucketname, basename):
    client = storage.Client()
    bucket = client.get_bucket(bucketname)
    blobname="{}/{}".format(PREFIX, basename)
    blob = Blob(blobname, bucket)
    blob.upload_from_filename(csvfile)
    gcslocation = 'gs://{}/{}/{}'.format(bucketname, PREFIX, basename)
    logging.info('Uploaded {} ...'.format(gcslocation))
    return gcslocation

In [20]:
basename = os.path.basename(csvfile)
gcslocation = upload(csvfile, BUCKET_NAME, basename)
gcslocation

'gs://ingres/bts/201305.csv'

---
### Putting it all together 

In [21]:
def ingest(year, month, bucket):
    '''
    ingest flights data from BTS website to Google Cloud Storage
    return cloud-storage-blob-name on success.
    raises DataUnavailable if this data is not on BTS website
    '''
    tempdir = tempfile.mkdtemp(prefix='ingest_flights')
    try:
        zipfile = download(year, month, tempdir)
        bts_csv = zip_to_csv(zipfile, tempdir)
        csvfile = remove_quotes_comma(bts_csv, year, month)
        verify_ingest(csvfile)
        return upload(csvfile, bucket, os.path.basename(csvfile))
    finally:
        logging.debug('Cleaning up by removing {}'.format(tempdir))
        #shutil.rmtree(tempdir)

In [22]:
ingest('2013', '06', BUCKET_NAME)

'gs://ingres/bts/201306.csv'

---
### Automatically download the file subsequent to the most recent file that's already in the bucket

In [23]:
def compute_next_month(year, month):
    dt = datetime.datetime(int(year), int(month), 15) # 15th of month
    dt = dt + datetime.timedelta(30) # will always go to next month
    logging.debug('The next month is {}'.format(dt))
    return '{}'.format(dt.year), '{:02d}'.format(dt.month)

def next_month(bucketname):
    '''
     Finds which months are on GCS, and returns next year,month to download
    '''
    client = storage.Client()
    bucket = client.get_bucket(bucketname)
    blobs  = list(bucket.list_blobs(prefix=PREFIX))
    files = [blob.name for blob in blobs if 'csv' in blob.name] # csv files only
    lastfile = os.path.basename(files[-1])
    logging.debug('The latest file on GCS is {}'.format(lastfile))
    year = lastfile[:4]
    month = lastfile[4:6]
    return compute_next_month(year, month)

In [24]:
next_month(BUCKET_NAME)

('2017', '02')

In [25]:
def ingest_next(bucket):
    year, month = next_month(bucket)
    return ingest(year, month, bucket)

In [26]:
ingest_next(BUCKET_NAME)

'gs://ingres/bts/201702.csv'

---
### Activating the service account for gcloud tool
Additionally, creating the app requires Project Owner role on the service account.
A bit weird, but I needed to deploy some "default" service. init_appengine.sh does that. It deploys some kind of hello world.

This is basically what I did on my machine
``` bash
gcloud auth activate-service-account wgiersche@ticino-2018.iam.gserviceaccount.com --key-file ~/.auth/ticino-2018.json
gcloud app create --region europe-west

###################################### Initialization. Only once for an app. There must be a more professional way of doing this!
git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/appengine/standard/hello_world
gcloud app deploy --quiet --stop-previous-version
############ End of initialization 

cd ~/workspace/exploration/
gcloud app deploy
```

Deployment takes quite a while. Don't lose your patience, ok?