
Where I got the idea for this project:https://www.youtube.com/watch?v=HiOtQMcI5wg&t=1614s

Beautiful Soup documentation:  https://www.crummy.com/software/BeautifulSoup/bs4/doc/#id12 

Some basic headers for requests module found at: # http://httpbin.org/get 

In [1]:
# importing the required python packages
from bs4 import BeautifulSoup as bs
import requests
import time
import datetime
import smtplib
import pandas as pd
import csv
from datetime import datetime
import random
from itertools import cycle
from lxml.html import fromstring
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.backends.backend_pdf

In [2]:
def get_proxies():
    # Method to scrape a list of free proxy IP addresses for aid in webscraping Amazon pages
    # returns: 
    #         - Set, a list of the current available proxies, updated on the website every 10 minutes (1/1/2023)
    url = 'https://free-proxy-list.net/'
    response = requests.get(url)
    parser = fromstring(response.text)
    proxies = set()
    for i in parser.xpath('//tbody/tr')[:10]:
        if i.xpath('.//td[7][contains(text(),"yes")]'):
            proxy = ":".join([i.xpath('.//td[1]/text()')[0], i.xpath('.//td[2]/text()')[0]])
            proxies.add(proxy)
    return proxies

In [3]:
def get_winning_proxy(URL,headers,proxies,prev_winner):
    # Method that tries to connect to a URL with the proxies returned from the get_proxies() method. Utilizes recursion to keep retrying this method until successful.
    #  
    # parameters:
    #            - URL : String, The web page address we are scraping
    #            - headers : Dictionary, the values to pass with the requests method for authenticication purposes
    #            - proxies : Set, the list of proxies returned from get_proxies()
    #            - prev_winner: String, the previous IP adderess that we successfully connected to the web page with
    # returns:
    #            - Soup : a beautiful soup object representing the indexed raw html code of our target web page
    #            - proxy: the new IP that we were able to connect to the web page with
    if prev_winner != 0:
        try:
            response = requests.get(URL,headers=headers,proxies={"http": prev_winner, "https": prev_winner})
            
            # check for the static value that is returned from the web if the request is blocked
            if "Something went wrong!" in bs(response.text,"html.parser").text:
                print("no luck")
            else:
                Soup = bs(response.text, "html.parser")
                print("lucky")
                return [Soup,prev_winner]
        except: 
            print("Skipping. Connnection error")
    
    proxy_pool = cycle(proxies)

    for i in range(1,11):
        proxy = next(proxy_pool)
        try:
            response = requests.get(URL,headers=headers,proxies={"http": proxy, "https": proxy})
            
            if "Something went wrong!" in bs(response.text,"html.parser").text:
                print("no luck")
                if i == 10:
                    print("retrying in 60s")
                    time.sleep(60)
                    proxies = get_proxies()

                    #recursive element
                    get_winning_proxy(URL,headers,proxies,prev_winner)
                next
            else:
                Soup = bs(response.text, "html.parser")
                print("lucky")
                return [Soup,proxy]
        except:
            print("Skipping. Connnection error")
            if i == 10:
                print("retrying in 60s")
                time.sleep(60)
                proxies = get_proxies()

                #recursive element
                get_winning_proxy(URL,headers,proxies,prev_winner)

In [4]:
def getnextpage(Soup,headers,prev_winner):
    # Method that seaches the Amazon page for the pagination data in order to grab the next page of results
    # parameters:
    #            - Soup : a beautiful soup object representing the indexed raw html code of our target web page
    #            - headers : Dictionary, the values to pass with the requests method for authenticication purposes
    #            - prev_winner: String, the previous IP adderess that we successfully connected to the web page with
    # # returns:
    #            - Soup : a beautiful soup object representing the indexed raw html code of our target web page
    #            - winner: the new IP that we were able to connect to the web page with

    proxies = get_proxies()
    if(Soup.find('span',{'class': 's-pagination-strip'})):
        paginate = Soup.find('span',{'class': 's-pagination-strip'})
        if(paginate.find('a',{'class':'s-pagination-item s-pagination-next s-pagination-button s-pagination-separator'})):
            url = 'https://www.amazon.com'+str(paginate.find('a',{'class':'s-pagination-item s-pagination-next s-pagination-button s-pagination-separator'})['href'])
            res = get_winning_proxy(url,headers,proxies,prev_winner)
            print(res[1])
            Soup = res[0]
            winner = res[1]
        else:
            return "End of Results"
    else:
        return "End of Results"
    return [Soup,winner]

In [5]:

def main():
    # Executes the entire data scraping of an Amazon results page. Since I used this to price and rating data for computer hardware, 
    # some of the strings to search for in the html are hardcoded and will need to be changed for different use cases. How
    # you structure the results will be important when changing how the dataframes are named and initialized.
    # This code can be modified by other programmers with a knowledge of beautiful soup and requests modules to scrape results
    # from another website that gives pages of results using link based pagination. 
    # Will output a single pdf with graphs showing the price history of all the products in the results page.
    # Each time this is ran, another day of data is going to be added to the charts. All of the history is stored as 
    # CSV that the delta data is added to.

    # ENTER THE PAGE OF AMAZON RESULTS THAT YOU WANT TO SCRAPE
    URL = ''

    headers = {
    'authority': 'www.amazon.com',
    'pragma': 'no-cache',
    'cache-control': 'no-cache',
    'dnt': '1',
    'upgrade-insecure-requests': '1',
    'user-agent': 'Mozilla/5.0 (X11; CrOS x86_64 8172.45.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.64 Safari/537.36',
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
    'sec-fetch-site': 'none',
    'sec-fetch-mode': 'navigate',
    'sec-fetch-dest': 'document',
    'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8',
    }

    # First time connecting to the URL
    proxies = get_proxies()
    prev_winner = 0
    res = get_winning_proxy(URL,headers,proxies,prev_winner)
    Soup = res[0]
    winner = res[1]

    # initialize DataFrame to store the results
    df = pd.DataFrame(columns = ['Title','Price','Date','Rating','Sample Size'])
    tSoup = Soup
    todayDate = datetime.today().strftime("%Y-%m-%d")

    # loop through the amazon pages and append the results to the DataFrames
    while tSoup != "End of Results":

        try:
            x = Soup.find_all('div',{'class':'a-section a-spacing-small a-spacing-top-small'})
        except:
            break

        for i in range(len(x)):
            if(x[i].find(class_='a-price-whole')):
                price_to_append = x[i].find(class_='a-price-whole').text.strip()
            else:
                price_to_append = "N/A"

            if(x[i].find(class_ = "a-size-medium a-color-base a-text-normal")):
                title_to_append = x[i].find(class_ = "a-size-medium a-color-base a-text-normal").text.strip()
            else:
                title_to_append = "N/A"

            if(x[i].find('div',{'class':'a-row a-size-small'})):
                if(x[i].find('div',{'class':'a-row a-size-small'}).find('span',{'aria-label':True})):
                    ratings = x[i].find('div',{'class':'a-row a-size-small'}).find_all('span',{'aria-label':True})
                    stars = ratings[0].text.strip()
                    samples = ratings[1].text.strip()
            else:
                stars = "N/A"
                samples = "N/A"


            df.loc[len(df.index)] = [title_to_append,price_to_append,todayDate,stars,samples]
        time.sleep(5)     
        res = getnextpage(Soup,headers,winner)
        tSoup = res[0]
        winner = res[1]

        Soup = tSoup

    # Initial cleaning of the data
    step1 = df.where(df['Title'] != 'N/A').where(df['Price'] != 'N/A').dropna().drop_duplicates()
    step2 = step1.loc[step1['Title'].str.contains("3070")]

    # Initialize separate DataFrames for each brand of GPU 
    MSI_df = step2.loc[step2['Title'].str.contains("msi",case=False)]
    Gigabyte_df = step2.loc[step2['Title'].str.contains("gigabyte",case=False)]
    Asus_df = step2.loc[step2['Title'].str.contains("Asus",case=False)]
    Evga_df = step2.loc[step2['Title'].str.contains("Evga",case=False)]
    Nvidia_df = step2.loc[step2['Title'].str.contains("Nvidia",case=False)]
    Zotac_df = step2.loc[step2['Title'].str.contains("Zotac",case=False)]

    # Will need to have these CSV files created in the same directory
    archive = ['MSI_history.csv','Gigabyte_history.csv','Nvidia_history.csv','Zotac_history.csv','Asus_history.csv','Evga_history.csv']
    delta = [MSI_df,Gigabyte_df,Nvidia_df,Zotac_df,Asus_df,Evga_df]

    pdf = matplotlib.backends.backend_pdf.PdfPages("AmazonOutput.pdf")

    # Further cleaning of the data, storing the new data, and creating the charts
    counter = 0
    for name in archive:

        df_v1 = delta[counter]
        df_v1 = df_v1.where(df_v1['Rating'] != 'N/A').dropna()

        # clean title
        df_v1['Title'] = df_v1['Title'].str.slice(0,110)

        # clean price
        try:
            df_v1["Price"] = df_v1["Price"].astype(float) 
        except:
            df_v1["Price"] = df_v1["Price"].astype(str)
            df_v1["Price"] = df_v1["Price"].str.replace(',','')
            df_v1["Price"] = df_v1["Price"].str.replace('.','')
            df_v1["Price"] = df_v1["Price"].astype(int)  

        #clean rating
        df_v1["Rating"] = df_v1["Rating"].str.slice(0,3)
        df_v1["Rating"] = df_v1["Rating"].astype(float)

        #clean sample
        df_v1["Sample Size"] = df_v1["Sample Size"].astype(str)
        df_v1["Sample Size"] = df_v1["Sample Size"].str.replace('(','')
        df_v1["Sample Size"] = df_v1["Sample Size"].str.replace(')','')
        df_v1["Sample Size"] = df_v1["Sample Size"].astype(float)

        tHistory = pd.read_csv(name)
        df_v1 = pd.concat([tHistory,df_v1])
        df_v1['Date'] = pd.to_datetime(df_v1['Date'],unit='ns')
        df_v1.to_csv(name,index=False)
        df_v1.sort_values(by=["Date"])

        names = df_v1["Title"].unique()
        df_list = list()

        # plot the data using matplotlib and place it on the pdf
        for i in range(len(names)):
            df = df_v1.loc[df_v1['Title'] == names[i]]
            df_list.append(df)

        figure = plt.figure(figsize = (11,13))
        ax = figure.add_subplot(111)
        ax.set_position([0.10,0.60,0.8,0.3])
        formats = ['p-','r-', 'y-', 'g-', 'b-','o-', 'r--', 'g--', 'y--', 'b--', 'o--','p--','p-','r-', 'y-', 'g-', 'b-','o-', 'r--', 'g--', 'y--', 'b--', 'o--','p--']
        formats1 = ['yo', 'r^', 'go', 'y*', 'bo', 'ro', 'g*', 'y^', 'b^', 'yo', 'g^','yo', 'r^', 'go', 'y*', 'bo', 'ro', 'g*', 'y^', 'b^', 'yo', 'g^']
        for i in range(len(df_list)):
            if(len(df_list[i].index)>1):
                ax.plot(df_list[i]['Date'],df_list[i]['Price'],formats[i],label = str(df_list[i]['Title'].iloc[0]),figure=figure)
            else:
                ax.plot(df_list[i]['Date'],df_list[i]['Price'],formats1[i],label = str(df_list[i]['Title'].iloc[0]),figure=figure)
        plt.title(name,figure=figure)
        legend = ax.legend(bbox_to_anchor =(0.5,-1.3), loc='lower center')
        plt.show()
        pdf.savefig(figure)
        counter = counter + 1
    pdf.close()
