<a href="https://colab.research.google.com/github/Milafreire/web_scrap_parquet_to_bq/blob/main/web_scrap_storage_to_bq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache-beam[interactive,dataframe,gcp]

Collecting apache-beam[dataframe,gcp,interactive]
  Downloading apache_beam-2.56.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.5/14.5 MB[0m [31m40.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache-beam[dataframe,gcp,interactive])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[dataframe,gcp,interactive])
  Downloading orjson-3.10.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (142 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m142.5/142.5 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[dataframe,gcp,interactive])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     

##Configs

In [2]:
import pyarrow as pq
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
from apache_beam.transforms.sql import SqlTransform
from apache_beam.io import ReadFromParquet

In [3]:
# Google Auth
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

# GCP Project
os.environ["GOOGLE_CLOUD_PROJECT"]= 'mywebscrap-423316'

Authenticated


##Methods

##Pipeline Options


In [13]:
# Config Pipeline Options
pipeline_options = {
      'project': 'mywebscrap-423316'
      }
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
pipeline = beam.Pipeline(options=pipeline_options)

 # Call Pipeline and pass parameters

In [14]:
def insert_scrap_data_to_bq(pipeline, table_scrap, temp_location):
    schema='id:INTEGER, product_name:STRING, bar_code:STRING, category:STRING, price:FLOAT, qty_stock:INTEGER, star_rating:STRING, extraction_date:DATE'

    web_scrap_data = (
        pipeline
        | 'Write To ParquetCloud Storage' >> beam.io.ReadFromParquet(input_file)
    )

    web_scrap_data | 'Write to Big Query' >> beam.io.WriteToBigQuery(
            table_scrap,
            schema=schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            custom_gcs_temp_location=temp_location
        )
    pipeline.run().wait_until_finish()

In [8]:
def category_analise(pipeline, table_analise, temp_location):
  query = '''
  SELECT
      category,
      MIN(price) AS min_price,
      APPROX_QUANTILES(price, 100)[OFFSET(50)] AS median_price,
      MAX(price) AS max_price,
      MIN(CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END) AS min_star_rating,
      APPROX_QUANTILES(CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END, 100)[OFFSET(50)] AS median_star_rating,
      MAX(CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END) AS max_star_rating,
      ARRAY_AGG(product_name ORDER BY price DESC, product_name LIMIT 1)[OFFSET(0)] AS product_highest_price,
      ARRAY_AGG(product_name ORDER BY CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END DESC, product_name LIMIT 1)[OFFSET(0)] AS product_highest_rating
  FROM
      `mywebscrap-423316.estudos_gcp.books_to_scrap`
  GROUP BY
      category;
  '''

  execute_sql = (
    pipeline
    | 'Execute SQL Query' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True, gcs_location=temp_location)
  )

  execute_sql | 'Write To BigQuery' >> beam.io.WriteToBigQuery(
      table_analise,
      schema='SCHEMA_AUTODETECT',
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
      custom_gcs_temp_location = temp_location
  )
  pipeline.run()

In [15]:
 # Call Pipeline and pass parameters
if __name__ == '__main__':
    url = 'https://books.toscrape.com/catalogue/page-1.html'
    input_file = 'gs://myscraptest/parquet_files/scrap_data'
    table_scrap = 'mywebscrap-423316.estudos_gcp.books_to_scrap'
    table_analise='mywebscrap-423316.estudos_gcp.category_analytics'
    temp_location = 'gs://myscraptest/temp/'
    insert_scrap_data_to_bq(pipeline, table_scrap, temp_location)
    category_analise(pipeline, table_analise, temp_location)