In this notebook, I am using the Fitbit Kaggle dataset and load it to my personal Bigquery in order to make the data cleaning process. 

I could use plain python/pandas code, but I want to show what I can do with SQL and BigQuery. 

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Loading the kaggle dataset to Bigquery

This step is not needed, since I could clean and analyze the dataset in python, but I want to play with the new tools that I learned. Therefore I will upload the kaggle dataset into my personal Google Cloud Services (GCS) area. 

Reminder: link your Kaggle notebook to Google Cloud Services in the menu `Add-ons > Google Cloud Services`.

I have a test project in my GCS area called `test-project-306614` where I will upload the datasets. 

In [None]:
from google.cloud import bigquery

projectID = 'test-project-306614'
datasetID = 'Fitabase'
# Construct a BigQuery client object.
client = bigquery.Client(project=projectID, location='US')  ## not sure if the location is needed
# client.create_dataset(datasetID)  ## to run only one time to create the dataset

The next cell creates tables in the `datasetID`. It should be run only once. 

In [None]:
from google.api_core.exceptions import AlreadyExists, Conflict

# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         tableName = filename.replace('_merged.csv','')
#         try: client.create_table(f"{projectID}.{datasetID}.{tableName}")
#         except Conflict: print(f'Table {tableName} already created')
        
#         datasetRef = client.dataset(datasetID)
#         tableRef = datasetRef.table(tableName)
#         jobConfig = bigquery.LoadJobConfig()
        
#         #### the fitbit dataset contains date columns using am/pm format, which BigQuery 
#         #### has problems identifying. I am using a quick hack using pandas here.
#         tmpDf = pd.read_csv(os.path.join(dirname, filename))
#         for iCol in tmpDf.columns.to_list():
#             if iCol.endswith(('Minute', 'Hour', 'date', 'Time', 'Date', 'Day')): 
#                 tmpDf[iCol] = pd.to_datetime(tmpDf[iCol])
#         job = client.load_table_from_dataframe(tmpDf, tableRef, job_config=jobConfig)
#         job.result()
        
#         print("Loaded {} rows into {}:{}.".format(job.output_rows, datasetID, tableName))

After creating the dataset in BigQuery, let's load the datasets for cleaning.

In [None]:
dataset = client.get_dataset(datasetID)   
tables = list(client.list_tables(dataset))

# Print names of all tables in the dataset
for table in tables: print(table.table_id)

Simple test to see if the queries are working:

In [None]:
query = f""" SELECT *
        FROM `{projectID}.{datasetID}.dailyActivity`"""

# Set up the query
query_job = client.query(query)

# API request - run the query, and return a pandas DataFrame
data = query_job.to_dataframe()
data.head()

Checking the number of dates per Id.

In [None]:
query = f""" 
SELECT 
    CAST(Id AS INT) AS newId,
    COUNT(ActivityDate) as count
FROM `{projectID}.{datasetID}.dailyActivity`
GROUP BY newId
ORDER BY count ASC
"""

query_job = client.query(query)
data = query_job.to_dataframe()
data

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt 

query = f""" 
SELECT 
    CAST(Id AS STRING) AS newId, 
    CAST(ActivityDate AS DATE) AS Date
FROM `{projectID}.{datasetID}.dailyActivity`
"""
query_job = client.query(query)
data = query_job.to_dataframe()
plt.figure(figsize=(10,10))
sns.scatterplot(data=data, x='Date', y='newId' )

Let's create tables without the ids 4057192912, 2347167796, 8253242879, 3372868164, and according to the plot above between `2016-04-12` and `2016-05-07` where all the participants have data.

In [None]:
# from google.api_core.exceptions import AlreadyExists, Conflict

# for table in tables: 
#     print('Processing table: ', table.table_id)
#     tableName = table.table_id+'_cleaned'
#         try: client.create_table(f"{projectID}.{datasetID}.{tableName}")
#         except Conflict: print(f'Table {tableName} already created')
        
#         datasetRef = client.dataset(datasetID)
#         tableRef = datasetRef.table(tableName)
#         jobConfig = bigquery.LoadJobConfig()
        
#         query = f""" 
# SELECT * 
# FROM `{projectID}.{datasetID}.{table.table_id}`
# WHERE ActivityDa
# """
# query_job = client.query(query)
# data = query_job.to_dataframe()
        
# #         #### the fitbit dataset contains date columns using am/pm format, which BigQuery 
# #         #### has problems identifying. I am using a quick hack using pandas here.
# #         tmpDf = pd.read_csv(os.path.join(dirname, filename))
# #         for iCol in tmpDf.columns.to_list():
# #             if iCol.endswith(('Minute', 'Hour', 'date', 'Time', 'Date', 'Day')): 
# #                 tmpDf[iCol] = pd.to_datetime(tmpDf[iCol])
# #         job = client.load_table_from_dataframe(tmpDf, tableRef, job_config=jobConfig)
# #         job.result()
        
# #         print("Loaded {} rows into {}:{}.".format(job.output_rows, datasetID, tableName))

Checking if the tables have `nan` values in there

In [None]:
for table in tables: 
    print('Checking table: ', table.table_id)
    query = f"""
SELECT col_name, COUNT(1) AS null_count
FROM `{projectID}.{datasetID}.{table.table_id}` AS t,
UNNEST(REGEXP_EXTRACT_ALL(TO_JSON_STRING(t), r'"(\w+)":null')) col_name
GROUP BY col_name
"""

    query_job = client.query(query)
    data = query_job.to_dataframe()
    display(data)

In [None]:
query = f""" 
SELECT *
FROM `{projectID}.{datasetID}.weightLogInfo`
WHERE Fat > 0
"""

query_job = client.query(query)
data = query_job.to_dataframe()
data

Checking if there are duplicate rows in tables:

In [None]:
for table in tables: 
    print('Checking table: ', table.table_id)
    query = f""" 
SELECT
    (SELECT COUNT(1) FROM (SELECT DISTINCT * FROM `{projectID}.{datasetID}.{table.table_id}`)) AS distinct_rows,
    (SELECT COUNT(1) FROM `{projectID}.{datasetID}.{table.table_id}`) AS total_rows
"""
    query_job = client.query(query)
    data = query_job.to_dataframe()
    display(data)

Finally, just for fun, I merged all the daily information into one `csv` file.

In [None]:
query = f"""
WITH dailyActivity AS (
    SELECT 
        dailyActivity.*,
        dailySleep.* EXCEPT( Id, SleepDay )
    FROM `{projectID}.{datasetID}.dailyActivity` AS dailyActivity
    LEFT JOIN `{projectID}.{datasetID}.sleepDay` AS dailySleep 
        ON dailyActivity.Id = dailySleep.Id AND CAST( dailyActivity.ActivityDate as dateTime ) = CAST( dailySleep.SleepDay as dateTime )
)
SELECT 
    dailyActivity.*,
    dailyWeight.* EXCEPT(Id, Date)
FROM dailyActivity
LEFT JOIN `{projectID}.{datasetID}.weightLogInfo` AS dailyWeight 
    ON dailyActivity.Id = dailyWeight.Id AND CAST( dailyActivity.ActivityDate as dateTime ) = CAST( dailyWeight.Date as dateTime )
#GROUP BY Id
"""

# Set up the query
query_job = client.query(query)

# API request - run the query, and return a pandas DataFrame
data = query_job.to_dataframe()
data.to_csv('dailyActivity_allmerged.csv', index=False )
data.head()