In [1]:
import pandas as pd
import numpy as np

In [2]:
from google.cloud import bigquery
from google.oauth2 import service_account
import os
from dotenv import load_dotenv

load_dotenv()

GCP_BQ_CREDENTIALS_FILE_PATH = os.environ['GCP_BQ_CREDENTIALS']

credentials = service_account.Credentials.from_service_account_file(
    GCP_BQ_CREDENTIALS_FILE_PATH, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(credentials=credentials, project=credentials.project_id,)

In [3]:
first_in_year_tables = [
    'httparchive.summary_requests.2018_02_01_mobile',
    'httparchive.summary_requests.2019_02_01_mobile',
    'httparchive.summary_requests.2020_02_01_mobile',
    'httparchive.summary_requests.2021_02_01_mobile',
    'httparchive.summary_requests.2022_02_01_mobile',
    'httparchive.summary_requests.2023_02_01_mobile'
]

In [4]:
query_string = r'''
WITH final_extracted_table AS (
  WITH rt_extracted_values_table AS (
    WITH nel_values_extracted_table AS (
      WITH nel_extracted_table AS (
        WITH joined_table AS (
          WITH filtered_table AS (
            SELECT
            MIN(requestid) min_req_id,
            #COUNTIF(firstReq = false) occurences_count_not_firstreq,
            REGEXP_EXTRACT(url, r"http[s]?:[\/][\/]([^\/:]+)") AS url_domain,
            FROM `%s`
            GROUP BY url_domain
          )
          SELECT
          requestid,
          LOWER(respOtherHeaders) resp_headers,
          status,
          url,
          type,
          ext,
          firstReq,
          FROM filtered_table
          INNER JOIN `%s` ON filtered_table.min_req_id = requestid
        )

        SELECT
        requestid,
        type,
        ext,
        firstReq,
        status,
        url,
        resp_headers,

        (SELECT COUNT(*) FROM joined_table) AS unique_domain_count_before_filtration,
        (SELECT COUNT(*) FROM joined_table WHERE firstReq = true) AS unique_domain_firstreq_count_before_filtration,
        #REGEXP_EXTRACT(url, r"http[s]?:[\/][\/]([^\/:]+)") AS url_domain,
        REGEXP_CONTAINS(resp_headers, r"(?:^|.*[\s,]+)(nel\s*[=]\s*)") AS contains_nel,
        REGEXP_EXTRACT(resp_headers, r"(?:^|.*[\s,]+)nel\s*[=]\s*({.*?})") AS nel_value,

        FROM joined_table
      )
      SELECT
      requestid,
      type,
      ext,
      firstReq,
      status,
      url,
      #url_domain,
      unique_domain_count_before_filtration,
      unique_domain_firstreq_count_before_filtration,
      contains_nel,

      nel_value,
      resp_headers, # debug rm, todo: zceknout jestli jsou vsechny co rikaji ze contains_nel maji values

      # extract nel values
      REGEXP_EXTRACT(nel_value, r".*max_age[\"\']\s*:\s*([0-9]+)") AS nel_max_age,
      REGEXP_EXTRACT(nel_value, r".*failure[_]fraction[\"\']\s*:\s*([0-9\.]+)") AS nel_failure_fraction,
      REGEXP_EXTRACT(nel_value, r".*success[_]fraction[\"\']\s*:\s*([0-9\.]+)") AS nel_success_fraction,
      REGEXP_EXTRACT(nel_value, r".*include[_]subdomains[\"\']\s*:\s*(\w+)") AS nel_include_subdomains,

      REGEXP_EXTRACT(nel_value, r".*report_to[\"\']\s*:\s*[\"\'](.+?)[\"\']") AS nel_report_to_group,

      FROM nel_extracted_table
      WHERE contains_nel = true
    )

    SELECT
    requestid,
    type,
    ext,
    firstReq,
    status,
    url,
    unique_domain_count_before_filtration,
    unique_domain_firstreq_count_before_filtration,
    contains_nel,
    nel_max_age,
    nel_failure_fraction,
    nel_success_fraction,
    nel_include_subdomains,
    nel_report_to_group,
    #nel_value, # debug rm
    #reportto_value, # debug rm
    resp_headers,

    (SELECT COUNT(*) FROM nel_values_extracted_table) AS nel_count_before_filtration,

    REGEXP_EXTRACT(resp_headers, CONCAT(r"report[-]to\s*?[=].*([{](?:(?:[^\{]*?endpoints.*?[\[][^\[]*?[\]][^\}]*?)|(?:[^\{]*?endpoints.*?[\{][^\{]*?[\}]))?[^\]\}]*?group[\'\"][:]\s*?[\'\"]", nel_report_to_group, r"(?:(?:[^\}]*?endpoints[^\}]*?[\[][^\[]*?[\]][^\{]*?)|(?:[^\}]*?endpoints.*?[\{][^\{]*?[\}]))?.*?[}])")) AS rt_value,

    FROM nel_values_extracted_table
  )
  SELECT
  requestid,
  type,
  ext,
  firstReq,
  status,
  url,
  unique_domain_count_before_filtration,
  unique_domain_firstreq_count_before_filtration,
  contains_nel,
  nel_max_age,
  nel_failure_fraction,
  nel_success_fraction,
  nel_include_subdomains,
  nel_report_to_group,
  #nel_value, # debug rm
  rt_value,
  #resp_headers, # debug rm, todo: zceknout jestli jsou vsechny co rikaji ze contains_nel maji values
  nel_count_before_filtration,

  REGEXP_EXTRACT(rt_value, r".*group[\"\']\s*:\s*[\"\'](.+?)[\"\']") AS rt_group,

  REGEXP_EXTRACT_ALL(rt_value, r"url[\"\']\s*:\s*[\"\']http[s]?:[\\]*?[\/][\\]*?[\/]([^\/]+?)[\\]*?[\/\"]") AS rt_endpoints,
  REGEXP_EXTRACT(rt_value, r"url[\"\']\s*:\s*[\"\']http[s]?:[\\]*?[\/][\\]*?[\/]([^\/]+?)[\\]*?[\/\"]") AS rt_url,
  REGEXP_EXTRACT(rt_value, r"url[\"\']\s*:\s*(?:[\"\']http[s]?:[\\]*?[\/][\\]*?[\/].*?([^\.]+?[.][^\.]+?)[\\]*?[\/\"])") AS rt_url_sld,

  FROM rt_extracted_values_table
)

SELECT
requestid,
type,
ext,
firstReq,
status,
url,
unique_domain_count_before_filtration,
unique_domain_firstreq_count_before_filtration,
contains_nel,
nel_max_age,
nel_failure_fraction,
nel_success_fraction,
nel_include_subdomains,
nel_report_to_group,
#nel_value, # debug rm
#rt_value, # debug rm
#resp_headers, # debug rm, todo: zceknout jestli jsou vsechny co rikaji ze contains_nel maji values
nel_count_before_filtration,
rt_group,
rt_endpoints,
rt_url,
rt_url_sld,

FROM final_extracted_table
# filter data to only those that we could parse, and that has both report_to and group matching
# by analysis, the filtered out records contains either not json value, bad formating such as \" for json quotes, or are maybe improperly parsed by previous processing by table creators (no value, missing brackets)
WHERE nel_report_to_group = rt_group and nel_report_to_group is not null and rt_group is not null;

'''

In [5]:
def processing_bytes_estimation(table_list, query_string):
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    sum_mb = 0

    for table_name in table_list:
        query_job = client.query(
            (
                query_string % (table_name, table_name)
            ),
            job_config=job_config,
        )
        processed_mb = query_job.total_bytes_processed/1024/1024
        print("This query '{}' will process {} MB, {} GB.".format(table_name, processed_mb, processed_mb/1024))
        sum_mb += processed_mb

    print()
    print("Total will process {} MB, {} GB, {} TB".format(sum_mb, sum_mb/1024, sum_mb/1024/1024))

In [6]:
processing_bytes_estimation(first_in_year_tables, query_string)

This query 'httparchive.summary_requests.2018_02_01_mobile' will process 12779.238265037537 MB, 12.47972486820072 GB.
This query 'httparchive.summary_requests.2019_02_01_mobile' will process 99190.25790596008 MB, 96.86548623628914 GB.
This query 'httparchive.summary_requests.2020_02_01_mobile' will process 132271.23532485962 MB, 129.17112824693322 GB.
This query 'httparchive.summary_requests.2021_02_01_mobile' will process 200293.90373802185 MB, 195.59951536916196 GB.
This query 'httparchive.summary_requests.2022_02_01_mobile' will process 265011.81181812286 MB, 258.8005974786356 GB.
This query 'httparchive.summary_requests.2023_02_01_mobile' will process 519405.28375053406 MB, 507.2317224126309 GB.

Total will process 1228951.730802536 MB, 1200.1481746118516 GB, 1.1720197017693863 TB


In [7]:
def run_queries_store_data(table_list, query_string) -> list:
    job_config = bigquery.QueryJobConfig()
    job_list = []

    for table_name in table_list:
        query_job = client.query(
            (
                query_string % (table_name, table_name)
            ),
            job_config=job_config,
        )

        dst_tmp_table_dict = query_job.__dict__['_properties']['configuration']['query']['destinationTable']
        dst_tmp_table_name = f'{dst_tmp_table_dict["projectId"]}.{dst_tmp_table_dict["datasetId"]}.{dst_tmp_table_dict["tableId"]}'

        tmp_df = query_job.to_dataframe()

        tmp_df.to_parquet(f"results_mobile_all_feb/{table_name}.{dst_tmp_table_name}.parquet")
        job_list.append(query_job)

    return job_list

In [8]:
job_list = run_queries_store_data(first_in_year_tables, query_string)

In [9]:
print(job_list)

[QueryJob<project=httparchive-analysis-376406, location=US, id=16066179-45d4-45f4-bcde-000d447f7f1f>, QueryJob<project=httparchive-analysis-376406, location=US, id=e7fdf661-30db-4a15-9d3e-ce2b44134596>, QueryJob<project=httparchive-analysis-376406, location=US, id=84070d92-00ed-4dab-8ec5-5d703eae92b4>, QueryJob<project=httparchive-analysis-376406, location=US, id=6ac82781-da67-4963-932b-18048caeb0fc>, QueryJob<project=httparchive-analysis-376406, location=US, id=2cbb080e-fc15-4e3b-b9dc-ce165579a7d7>, QueryJob<project=httparchive-analysis-376406, location=US, id=0465611c-3388-47b1-a0cf-84f4f9179309>]


In [10]:
print("asdf")

asdf
