<a href="https://colab.research.google.com/github/Milafreire/Exercicios/blob/master/Web_Scrap_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Configs

In [34]:
!pip install requests beautifulsoup4
!pip install apache-beam[interactive,dataframe,gcp]



In [41]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from google.cloud import bigquery
from apache_beam.transforms.sql import SqlTransform
# #Para caso a intenção seja trabalhar com parquet
# from google.cloud import storage
# from apache_beam.io import ReadFromParquet
# from apache_beam.io.parquetio import ReadFromParquet, WriteToParquet

In [42]:
# Google Auth
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

# GCP Project
os.environ["GOOGLE_CLOUD_PROJECT"]= 'mywebscrap-423316'

Authenticated


##Methods

In [43]:
class RenameColumns(beam.DoFn):
    def __init__(self, column_mapping):
        self.column_mapping = column_mapping

    def process(self, element):
        new_element = {self.column_mapping.get(k, k): v for k, v in element.items()}
        yield new_element

In [44]:
columns_to_rename = {
    'id':'id',
    'title':'title',
    'upc':'bar_code',
    'price':'price',
    'category':'category',
    'qty_stock':'qty_stock',
    'starrating':'star_rating'
}

##Pipeline Options


In [88]:
# # Config Pipeline Options
# pipeline_options = {
# 'project': 'mywebscrap-423316'
# }
# pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
# pipeline = beam.Pipeline(options=pipeline_options)

In [95]:
# Config Pipeline Options
pipeline_options = {
'project': 'mywebscrap-423316',
'runner': 'DataflowRunner',
'region': 'us-central1',
'staging_location': 'gs://myscraptest/temp/staging_location/',
'temp_location': 'gs://myscraptest/temp/temp_location/',
'template_location': 'gs://myscraptest/temp/template_location/'}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
pipeline = beam.Pipeline(options=pipeline_options)

 # Call Pipeline and pass parameters

In [92]:
class ScrapeBooksFn(beam.DoFn):
    def process(self, url):
        while url:
            response = requests.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            books = soup.find_all('article', class_='product_pod')

            for book in books:
                title = book.find('h3').find('a').get('title')
                link = book.find('h3').find('a').get('href')
                details_url = 'https://books.toscrape.com/catalogue/' + link
                book_response = requests.get(details_url)
                book_soup = BeautifulSoup(book_response.text, 'html.parser')

                upc = book_soup.find('th', text='UPC').find_next_sibling('td').text
                price_text = book_soup.find('th', text='Price (excl. tax)').find_next_sibling('td').text
                price = float(price_text.replace('£', '').replace('Â', '').strip())
                stock_text = book_soup.find('th', text='Availability').find_next_sibling('td').text
                stock = int(stock_text.split(' ')[2].strip('()'))
                category = book_soup.find('ul', class_='breadcrumb').find_all('a')[2].text
                rating = book_soup.find('p', {'class':'star-rating'})['class'][1]

                yield {
                    'title': title,
                    'bar_code': upc,
                    'category': category,
                    'price': price,
                    'qty_stock': stock,
                    'star_rating': rating,
                }

            next_link = soup.find('li', class_='next')
            if next_link:
                url = 'https://books.toscrape.com/catalogue/' + next_link.find('a')['href']
            else:
                url = None

In [93]:
def insert_data():
    schema='title:STRING, bar_code:STRING, category:STRING, price:FLOAT, qty_stock:INTEGER, star_rating:STRING'
    web_scrap_data = (
        pipeline
        | 'Start URL' >> beam.Create([url])
        | 'Scrape Books' >> beam.ParDo(ScrapeBooksFn())
        # | 'Rename Columns' >> beam.ParDo(RenameColumns(columns_to_rename))

    )

    web_scrap_data | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
            table_scrap,
            schema=schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            custom_gcs_temp_location=temp_location
    )

    pipeline.run()

In [75]:
def create_analise():
  query = '''
  SELECT
      category,
      MIN(price) AS min_price,
      APPROX_QUANTILES(price, 100)[OFFSET(50)] AS median_price,
      MAX(price) AS max_price,
      MIN(CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END) AS min_star_rating,
      APPROX_QUANTILES(CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END, 100)[OFFSET(50)] AS median_star_rating,
      MAX(CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END) AS max_star_rating,
      ARRAY_AGG(title ORDER BY price DESC, title LIMIT 1)[OFFSET(0)] AS title_highest_price,
      ARRAY_AGG(title ORDER BY CASE star_rating WHEN 'One' THEN 1 WHEN 'Two' THEN 2 WHEN 'Three' THEN 3 WHEN 'Four' THEN 4 WHEN 'Five' THEN 5 END DESC, title LIMIT 1)[OFFSET(0)] AS title_highest_rating
  FROM
      `mywebscrap-423316.estudos_gcp.books_to_scrap`
  GROUP BY
      category;
  '''

  execute_sql = (
    pipeline
    | 'Execute SQL Query' >> beam.io.ReadFromBigQuery(query=query, use_standard_sql=True, gcs_location=temp_location)
  )

  execute_sql | 'Write To BigQuery' >> beam.io.WriteToBigQuery(
      table_analise,
      schema='SCHEMA_AUTODETECT',
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
      custom_gcs_temp_location = temp_location
  )

  pipeline.run()

In [96]:
 # Call Pipeline and pass parameters
if __name__ == '__main__':
    url = 'https://books.toscrape.com/catalogue/page-1.html'
    input_file = 'gs://myscraptest/books_df.parquet'
    table_scrap='mywebscrap-423316.estudos_gcp.books_to_scrap'
    table_analise='mywebscrap-423316.estudos_gcp.category_analytics'
    temp_location = 'gs://myscraptest/temp/'
    insert_data()
    create_analise()