Carga de librerías

In [1]:
import os
import ingest_flights as src

from google.cloud import storage
from google.cloud import bigquery

Inspección rápida del URL de origen de datos

In [2]:
src.SOURCE

'https://storage.googleapis.com/data-science-on-gcp/edition2/raw'

# Pasos de arquitectura

## Descarga de datos

In [3]:
out_file = src.download(year='2018', month='10', destdir=os.path.join('data','raw'))
out_file

'data\\raw\\201810.zip'

## Descompresión

In [4]:
unziped = src.zip_to_csv(
    filename=out_file
    ,destdir=os.path.join('data','unziped'))

unziped

'data\\unziped\\On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.csv.gz'

## Bucket de Cloud Storage

Documentación en creación de [Buckets](https://cloud.google.com/storage/docs/creating-buckets#storage-create-bucket-client_libraries) usando clientes como Python

Documentación sobre las [clases](https://cloud.google.com/storage/docs/storage-classes) de almacenamiento

ID de proyecto:

```sh
gcloud config get-value project
```

In [5]:
PROJECT = 'ds-on-gcp-402120'
client = storage.Client(project=PROJECT)
bucket = client.bucket(bucket_name='ch02')
bucket.storage_class = 'COLDLINE'

if bucket.exists():
    print("Bucket {} with storage class {}".format(
        bucket.name, bucket.storage_class))
    
else:
    bucket = client.create_bucket(bucket, project=PROJECT, location='us-central1')

    print("Created bucket {} in {} with storage class {}".format(
        bucket.name, bucket.location, bucket.storage_class))

Bucket ch02 with storage class COLDLINE


## Ingesta de datos a Cloud Storage

In [6]:
src.upload(
    csvfile=unziped[:-3]
    ,bucketname=bucket.name
    ,blobname='201810.csv')

'gs://ch02/201810.csv'

## Carga a BigQuery

Antes debe existir el dataset de BigQuery en el que se almacenarán los datos

In [7]:
client = bigquery.Client(project=PROJECT)
bq_table = client.dataset('dsongcp').table('flights_raw$201810')

In [8]:
job_config = bigquery.LoadJobConfig()
job_config.source_format = 'CSV'
job_config.write_disposition = 'WRITE_TRUNCATE'
job_config.ignore_unknown_values = True
job_config.time_partitioning = bigquery.table.TimePartitioning('MONTH','FlightDate')
job_config.skip_leading_rows = 1
job_config.schema = [
    bigquery.SchemaField(col_and_type.split(':')[0], col_and_type.split(':')[1])  #, mode='required')
    for col_and_type in
    "Year:STRING,Quarter:STRING,Month:STRING,DayofMonth:STRING,DayOfWeek:STRING,FlightDate:DATE,Reporting_Airline:STRING,DOT_ID_Reporting_Airline:STRING,IATA_CODE_Reporting_Airline:STRING,Tail_Number:STRING,Flight_Number_Reporting_Airline:STRING,OriginAirportID:STRING,OriginAirportSeqID:STRING,OriginCityMarketID:STRING,Origin:STRING,OriginCityName:STRING,OriginState:STRING,OriginStateFips:STRING,OriginStateName:STRING,OriginWac:STRING,DestAirportID:STRING,DestAirportSeqID:STRING,DestCityMarketID:STRING,Dest:STRING,DestCityName:STRING,DestState:STRING,DestStateFips:STRING,DestStateName:STRING,DestWac:STRING,CRSDepTime:STRING,DepTime:STRING,DepDelay:STRING,DepDelayMinutes:STRING,DepDel15:STRING,DepartureDelayGroups:STRING,DepTimeBlk:STRING,TaxiOut:STRING,WheelsOff:STRING,WheelsOn:STRING,TaxiIn:STRING,CRSArrTime:STRING,ArrTime:STRING,ArrDelay:STRING,ArrDelayMinutes:STRING,ArrDel15:STRING,ArrivalDelayGroups:STRING,ArrTimeBlk:STRING,Cancelled:STRING,CancellationCode:STRING,Diverted:STRING,CRSElapsedTime:STRING,ActualElapsedTime:STRING,AirTime:STRING,Flights:STRING,Distance:STRING,DistanceGroup:STRING,CarrierDelay:STRING,WeatherDelay:STRING,NASDelay:STRING,SecurityDelay:STRING,LateAircraftDelay:STRING,FirstDepTime:STRING,TotalAddGTime:STRING,LongestAddGTime:STRING,DivAirportLandings:STRING,DivReachedDest:STRING,DivActualElapsedTime:STRING,DivArrDelay:STRING,DivDistance:STRING,Div1Airport:STRING,Div1AirportID:STRING,Div1AirportSeqID:STRING,Div1WheelsOn:STRING,Div1TotalGTime:STRING,Div1LongestGTime:STRING,Div1WheelsOff:STRING,Div1TailNum:STRING,Div2Airport:STRING,Div2AirportID:STRING,Div2AirportSeqID:STRING,Div2WheelsOn:STRING,Div2TotalGTime:STRING,Div2LongestGTime:STRING,Div2WheelsOff:STRING,Div2TailNum:STRING,Div3Airport:STRING,Div3AirportID:STRING,Div3AirportSeqID:STRING,Div3WheelsOn:STRING,Div3TotalGTime:STRING,Div3LongestGTime:STRING,Div3WheelsOff:STRING,Div3TailNum:STRING,Div4Airport:STRING,Div4AirportID:STRING,Div4AirportSeqID:STRING,Div4WheelsOn:STRING,Div4TotalGTime:STRING,Div4LongestGTime:STRING,Div4WheelsOff:STRING,Div4TailNum:STRING,Div5Airport:STRING,Div5AirportID:STRING,Div5AirportSeqID:STRING,Div5WheelsOn:STRING,Div5TotalGTime:STRING,Div5LongestGTime:STRING,Div5WheelsOff:STRING,Div5TailNum:STRING".split(',')
]

In [10]:
load_job = client.load_table_from_uri(
    source_uris='gs://ch02/201810.csv'
    ,destination=bq_table
    ,job_config=job_config
)

load_job.result()

LoadJob<project=ds-on-gcp-402120, location=us-central1, id=019ac474-e624-4d1f-83e2-c49afb9f9b4d>