# DATA3404 Big Data Assignment Bootstrap

### 1. Preparing GoogleDriveDownloader.
The following defines a GoogleDriveDownloader for this notebook, The original version of this from pypi is broken due to a Google update. This pared-down version solves the issue for our use case.

In [0]:
from __future__ import print_function
import requests
import zipfile
import warnings
from sys import stdout
from os import makedirs
from os.path import dirname
from os.path import exists


class GoogleDriveDownloader:
    CHUNK_SIZE = 32768
    DOWNLOAD_URL = 'https://docs.google.com/uc?export=download'

    @staticmethod
    def download_file_from_google_drive(file_id, dest_path, overwrite=False, unzip=False):
        destination_directory = dirname(dest_path)
        if not exists(destination_directory):
            makedirs(destination_directory)

        if not exists(dest_path) or overwrite:
            session = requests.Session()
            print('Downloading {} into {}... '.format(file_id, dest_path), end='')
            stdout.flush()
            params = {'id': file_id, 'confirm': "T"}
            response = session.get(GoogleDriveDownloader.DOWNLOAD_URL, params=params, stream=True)
            GoogleDriveDownloader._save_response_content(response, dest_path)
            print('Done.')

            if unzip:
                try:
                    print('Unzipping...', end='')
                    stdout.flush()
                    with zipfile.ZipFile(dest_path, 'r') as z:
                        z.extractall(destination_directory)
                    print('Done.')
                except zipfile.BadZipfile:
                    warnings.warn('Ignoring `unzip` since "{}" does not look like a valid zip file'.format(file_id))

    @staticmethod
    def _save_response_content(response, destination):
        with open(destination, 'wb') as f:
            for chunk in response.iter_content(GoogleDriveDownloader.CHUNK_SIZE):
                if chunk:  # filter out keep-alive new chunks
                    f.write(chunk)

### 2. Download datafiles from Google Drive and upload them into Databricks FileStore.
We perform these steps initially so that the data persists. If we download them locally into the cluster, the data will not persist after the cluster has been terminated.

First we define which files we want to retrieve from the lecture's Google Droive folder.

In [0]:
dbfs_fileStore_prefix = "/FileStore/tables"
prefix = "ontimeperformance"
files = [
  { "name": f"{prefix}_flights_small.csv",   "file_id": "1MtLQ9e9rZkqV5LCZykrke0JFmJer38A5" },
  { "name": f"{prefix}_flights_medium.csv",  "file_id": "1JhpTPr-CLeIKu-NyBALEwwYaUsjqIoU9" },
  { "name": f"{prefix}_flights_large.csv", "file_id": "1lF4DL5U3gTKmbV4qFcHkev0sV1vP8gqw" },
  { "name": f"{prefix}_airlines.csv", "file_id": "1hnKMGW2GTATRzn6T7ut9GgaJ7-d4RAH7" },
  { "name": f"{prefix}_airports.csv", "file_id": "195HuQBaixRCyMrT-p6hv-GuZl7zsL5qI" },
  { "name": f"{prefix}_aircrafts.csv", "file_id": "1wLFRY1Wi3knF-V9z_49zmxhmXhK-EiJe" }
]

large_files =  { "name": f"{prefix}_flights_large.zip", "file_id": "1USC4NI41j6LvqH7S4QYMn7wgHtWwSgo9" }

**Preparation:** Remove existing files

In [0]:
import os
for file in files:
  if os.path.exists("/tmp/{}".format(file["name"])):
    os.remove("/tmp/{}".format(file["name"]))
  dbutils.fs.rm("/FileStore/tables/{}".format(file["name"]))

**Download:** Download small, medium and large (1GB) datasets to your DBFS.

If this includes the large datafile, may take 20 to 30 seconds to complete, plus another minute or so for the subsquent conversion into the tables folder.

In [0]:
for file in files:
  GoogleDriveDownloader.download_file_from_google_drive(file_id=file["file_id"], dest_path="/tmp/{}".format(file["name"]))

In [0]:
for file in files:
  dbutils.fs.mv("file:/tmp/{}".format(file["name"]), "/FileStore/tables/{}".format(file["name"]))


### 3. Check the files in Databricks FileStore.
You can use the below cell to explore the DBFS directories and check that the files are where they should be, and contain the correct data.

In [0]:
display(dbutils.fs.ls("/FileStore/tables"))
display(dbutils.fs.head("/FileStore/tables/ontimeperformance_flights_large.csv"))

Congratulations, you have now all the datasets which you need for the Assignment 1 available.

You can now close this notebook and continue with the Demo notebook about how to access these datasets using SQL.

# STOP HERE #

The next step will take several minutes and the massive dataset loaded there is only needed for the scalability evaluation at the end of your assignment. For starting on your assignment task, you are fine with the first three datasets loaded so far.

### Download extra large files (8 GB), extract, and move them to DBFS

Only do the next step once you need this massive 8GB dataset for the actual performance evaluation. It will take a long time...

In [0]:

if os.path.exists("/tmp/{}".format(large_files["name"])):
  os.remove("/tmp/{}".format(large_files["name"]))
dbutils.fs.rm("/FileStore/tables/{}".format(f'{prefix}_flights_massive.csv'))
  
GoogleDriveDownloader.download_file_from_google_drive(file_id=large_files["file_id"], 
                                    dest_path="/tmp/{}".format(large_files["name"]),
                                    unzip=True)

# dbutils.fs.mv("file:/tmp/8000mb_sample_flightdata.csv", f"{dbfs_fileStore_prefix}/{prefix}_flights_massive.csv")


In [0]:
dbutils.fs.mv("file:/tmp/ontimeperformance_flights_massive.csv", f"{dbfs_fileStore_prefix}/{prefix}_flights_massive.csv")
