<a href="https://colab.research.google.com/github/girikanchan/colabassignment/blob/main/KanchanGiri_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This script defines several functions:

* get_latest_version(): Scrapes the Ubuntu Security repository to retrieve the latest version of Chromium browser for Ubuntu 18.04.

* download(latest_version, quiet): Downloads and installs Chromium browser and its dependencies for Ubuntu 20.04 using the retrieved latest version. It also supports a quiet mode to suppress verbosity during installation.

* check_chromium_installation(): Checks if Chromium browser is successfully installed by attempting to run it.

* install_selenium_package(quiet): Installs the Selenium package via pip. It also supports a quiet mode to suppress verbosity during installation.

* main(quiet): The main function orchestrates the entire process by calling the above functions in sequence. It first gets the latest Chromium version, downloads and installs Chromium and its dependencies, checks the installation, and finally installs Selenium.

The script can be executed as a standalone Python script, and you can set the quiet variable to control the verbosity of downloads and installations. When quiet is set to True, it will run in a quieter mode.

In [8]:
import csv
import requests
from bs4 import BeautifulSoup

In [7]:
import os
import re
import subprocess
import requests

# The deb files we need to install
deb_files_startstwith = [
    "chromium-codecs-ffmpeg-extra_",
    "chromium-codecs-ffmpeg_",
    "chromium-browser_",
    "chromium-chromedriver_"
]

def get_latest_version() -> str:
    # A request to security.ubuntu.com for getting latest version of chromium-browser
    # e.g. "112.0.5615.49-0ubuntu0.18.04.1_amd64.deb"
    url = "http://security.ubuntu.com/ubuntu/pool/universe/c/chromium-browser/"
    r = requests.get(url)
    if r.status_code != 200:
        raise Exception("status_code code not 200!")
    text = r.text

    # Find latest version
    pattern = '<a\shref="chromium\-browser_([^"]+.ubuntu0\.18\.04\.1_amd64\.deb)'
    latest_version_search = re.search(pattern, text)
    if latest_version_search:
        latest_version = latest_version_search.group(1)
    else:
        raise Exception("Can not find latest version!")
    return latest_version

def download(latest_version: str, quiet: bool):
    deb_files = []
    for deb_file in deb_files_startstwith:
        deb_files.append(deb_file + latest_version)

    for deb_file in deb_files:
        url = f"http://security.ubuntu.com/ubuntu/pool/universe/c/chromium-browser/{deb_file}"

        # Download deb file
        if quiet:
            command = f"wget -q -O /content/{deb_file} {url}"
        else:
            command = f"wget -O /content/{deb_file} {url}"
        print(f"Downloading: {deb_file}")
        # os.system(command)
        !$command

        # Install deb file
        if quiet:
            command = f"apt-get install /content/{deb_file} >> apt.log"
        else:
            command = f"apt-get install /content/{deb_file}"
        print(f"Installing: {deb_file}\n")
        # os.system(command)
        !$command

        # Delete deb file from disk
        os.remove(f"/content/{deb_file}")

def check_chromium_installation():
    try:
        subprocess.call(["chromium-browser"])
        print("Chromium installation successfull.")
    except FileNotFoundError:
        print("Chromium Installation Failed!")

def install_selenium_package(quiet: bool):
    if quiet:
        !pip install selenium -qq >> pip.log
    else:
        !pip install selenium

def main(quiet: bool):
    # Get the latest version of chromium-browser for ubuntu 18.04
    latest_version = get_latest_version()
    # Download and install chromium-browser for ubuntu 20.04
    download(latest_version, quiet)
    # Check if installation succesfull
    check_chromium_installation()
    # Finally install selenium package
    install_selenium_package(quiet)

if __name__ == '__main__':
    quiet = True # verboseness of wget and apt
    main(quiet)


Downloading: chromium-codecs-ffmpeg-extra_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb
Installing: chromium-codecs-ffmpeg-extra_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb

Downloading: chromium-codecs-ffmpeg_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb
Installing: chromium-codecs-ffmpeg_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb

Downloading: chromium-browser_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb
Installing: chromium-browser_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb

Downloading: chromium-chromedriver_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb
Installing: chromium-chromedriver_112.0.5615.49-0ubuntu0.18.04.1_amd64.deb

Chromium installation successfull.


In [9]:
# install chromium, its driver, and selenium
!apt update
!apt install chromium-chromedriver
!pip install selenium

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Connecting to security.ubuntu.com (185.125.190.36)] [Connected to cloud.r-p[0m                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [2 InRelease 15.6 kB/119 kB 13%] [Connecting to security.ubuntu.com (185.125[0m[33m0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont[0m                                                                               Get:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
[33m0% [4 InRelease 11.4 kB/109 kB 11%] [Waiting for headers] [Connecting to ppa.la[0m                                                                              

In [6]:
pip install webdriver_manager


Collecting webdriver_manager
  Downloading webdriver_manager-4.0.0-py2.py3-none-any.whl (27 kB)
Collecting python-dotenv (from webdriver_manager)
  Downloading python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv, webdriver_manager
Successfully installed python-dotenv-1.0.0 webdriver_manager-4.0.0


In [10]:
import csv
import time
from selenium import webdriver
from selenium.common import NoSuchElementException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager


class GoogleMapScraper:
    def __init__(self):
        self.output_file_name = "google_map_business_data.csv"
        self.headless = False
        self.unique_check = []
        self.driver = self.config_driver()  # Initialize the driver here

    def config_driver(self):
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        driver = webdriver.Chrome(options=options)
        return driver

    def save_data(self, data):
        header = ['id', 'company_name', 'rating', 'reviews_count', 'address', 'category', 'phone', 'website']
        with open(self.output_file_name, 'a', newline='', encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            if data[0] == 1:
                writer.writerow(header)
            writer.writerow(data)


    def parse_contact(self, business):
        try:
            contact = business.find_elements(By.CLASS_NAME, "W4Efsd")[3].text.split("·")[-1].strip()
        except:
            contact = ""

        if "+1" not in contact:
            try:
                contact = business.find_elements(By.CLASS_NAME, "W4Efsd")[4].text.split("·")[-1].strip()
            except:
                contact = ""

        return contact


    def parse_rating_and_review_count(self, business):
        try:
            reviews_block = business.find_element(By.CLASS_NAME, 'AJB7ye').text.split("(")
            rating = reviews_block[0].strip()
            reviews_count = reviews_block[1].split(")")[0].strip()
        except:
            rating = ""
            reviews_count = ""

        return rating, reviews_count


    def parse_address_and_category(self, business):
        try:
            address_block = business.find_elements(By.CLASS_NAME, "W4Efsd")[2].text.split("·")
            if len(address_block) >= 2:
                address = address_block[1].strip()
                category = address_block[0].strip()
            elif len(address_block) == 1:
                address = ""
                category = address_block[0]
        except:
            address = ""
            category = ""

        return address, category


    def get_business_info(self):
        time.sleep(2)
        for business in self.driver.find_elements(By.CLASS_NAME, 'THOPZb'):
            name = business.find_element(By.CLASS_NAME, 'fontHeadlineSmall').text
            rating, reviews_count = self.parse_rating_and_review_count(business)
            address, category = self.parse_address_and_category(business)
            contact = self.parse_contact(business)
            try:
                website = business.find_element(By.CLASS_NAME, "lcr4fd").get_attribute("href")
            except NoSuchElementException:
                website = ""

            unique_id = "".join([name, rating, reviews_count, address, category, contact, website])
            if unique_id not in self.unique_check:
                data = [name, rating, reviews_count, address, category, contact, website]
                self.save_data(data)
                self.unique_check.append(unique_id)



    def load_companies(self, url):
        print("Getting business info", url)
        self.driver.get(url)
        time.sleep(5)
        panel_xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[1]'
        panel_xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[1]'
        scrollable_div = self.driver.find_element(By.XPATH, panel_xpath)
        # scrolling
        flag = True
        i = 0
        while flag:
            print(f"Scrolling to page {i + 2}")
            self.driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', scrollable_div)
            time.sleep(2)

            if "You've reached the end of the list." in self.driver.page_source:
                flag = False

            self.get_business_info()
            i += 1


# Take input from the user for location and industry
location = input("Enter the location: ")
industry = input("Enter the industry: ")


# Replace spaces with '+' in the location
location = location.replace(" ", "+")

# Replace spaces with '+' in the industry
industry = industry.replace(" ", "+")

# Generate the URL using f-strings
url = f"https://www.google.com/maps/search/{location}+{industry}/"

# Add the generated URL to your URLs list
urls = [url]


business_scraper = GoogleMapScraper()
business_scraper.config_driver()
for url in urls:
    business_scraper.load_companies(url)

Enter the location: indore
Enter the industry: apna sweets
Getting business info https://www.google.com/maps/search/indore+apna+sweets/
Scrolling to page 2


# Code Description
The script uses the Selenium library to automate interactions with the Google Maps website. Here's a breakdown of the script along with some documentation:

Purpose of the Script:

The script is designed to scrape business information (such as company name, rating, reviews count, address, category, phone, and website) from Google Maps based on user-defined location and industry.

Dependencies:
* csv: Used for CSV file handling.
* time: Used for adding delays between web interactions.
* selenium: A web automation library for controlling a web browser through the program.
* webdriver_manager.chrome: Used for managing the Chrome WebDriver.

Class GoogleMapScraper:

This class represents the Google Maps scraper. It contains methods for configuring the web driver, saving data to a CSV file, and parsing various pieces of information from business listings.

Methods:

* __init__(self): Initializes the scraper with default settings, creates a list to check for unique data, and configures the web driver.

* config_driver(self): Configures the Chrome WebDriver with specific options (headless, no sandbox, and no shared memory).

* save_data(self, data): Appends data to the CSV file. It first writes the header row if the file is empty.

* parse_contact(self, business): Parses the contact information from a business listing.

* parse_rating_and_review_count(self, business): Parses the rating and review count from a business listing.

* parse_address_and_category(self, business): Parses the address, category, operating hours, and phone number from a business listing.

* get_business_info(self): Scrapes business information from the current page of Google Maps listings.

* load_companies(self, url): Loads Google Maps listings for a given URL, scrolls through the listings, and calls get_business_info() to scrape data.


Input:

The script takes user input for location and industry.

* location: The geographical location where you want to search for businesses.
* industry: The industry or category of businesses you want to search for.

Usage:
* User inputs the location and industry.
* The script generates a Google Maps URL for the specified location and industry.
* The GoogleMapScraper object is created.
* The load_companies() method is called with the generated URL, which initiates the scraping process.


Note:
* The script performs scrolling to load more business listings dynamically.
* It checks for unique business data to avoid duplicates in the CSV file.
* It can be modified to handle multiple URLs if you want to scrape data from multiple search results pages.

In [24]:
import csv
import time
from selenium import webdriver
from selenium.common import NoSuchElementException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager


class GoogleMapScraper:
    def __init__(self):
        self.output_file_name = "google_map_business_data.csv"
        self.headless = False
        self.unique_check = []
        self.driver = self.config_driver()  # Initialize the driver here

    def config_driver(self):
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        driver = webdriver.Chrome(options=options)
        return driver

    def save_data(self, data):
        header = ['id', 'company_name', 'rating', 'reviews_count', 'address', 'category', 'phone', 'website']
        with open(self.output_file_name, 'a', newline='', encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            if data[0] == 1:
                writer.writerow(header)
            writer.writerow(data)


    def parse_contact(self, business):
        try:
            contact = business.find_elements(By.CLASS_NAME, "W4Efsd")[3].text.split("·")[-1].strip()
        except:
            contact = ""

        if "+1" not in contact:
            try:
                contact = business.find_elements(By.CLASS_NAME, "W4Efsd")[4].text.split("·")[-1].strip()
            except:
                contact = ""

        return contact


    def parse_rating_and_review_count(self, business):
        try:
            reviews_block = business.find_element(By.CLASS_NAME, 'AJB7ye').text.split("(")
            rating = reviews_block[0].strip()
            reviews_count = reviews_block[1].split(")")[0].strip()
        except:
            rating = ""
            reviews_count = ""

        return rating, reviews_count

    def parse_address_and_category(self, business):
      try:
        address_block = business.find_elements(By.CLASS_NAME, "W4Efsd")[2].text.split("·")
        print(address_block)
        if len(address_block) >= 2:
            category = address_block[0].strip()
            address_info = address_block[1].strip().split("·")
            if len(address_info) == 2:
                operating_hours = address_info[0].strip()
                phone_number = address_info[1].strip()
            elif len(address_info) == 1:
                operating_hours = address_info[0].strip()
                phone_number = ""
        elif len(address_block) == 1:
          address_info = ""
          category = address_block[0]
          operating_hours = ""
          phone_number = ""
      except:
        category = ""
        operating_hours = ""
        phone_number = ""
        address_info = ""

      return address_info,category, operating_hours, phone_number

    def get_business_info(self):
        time.sleep(2)
        for business in self.driver.find_elements(By.CLASS_NAME, 'THOPZb'):
            name = business.find_element(By.CLASS_NAME, 'fontHeadlineSmall').text
            rating, reviews_count = self.parse_rating_and_review_count(business)
            address_info, category, operating_hours, phone_number = self.parse_address_and_category(business)
            contact = self.parse_contact(business)
            try:
                website = business.find_element(By.CLASS_NAME, "lcr4fd").get_attribute("href")
            except NoSuchElementException:
                website = ""

            unique_id = "".join([name, rating, reviews_count, category, operating_hours, phone_number, contact, website])
            if unique_id not in self.unique_check:
              data = [name, rating, reviews_count, category, operating_hours, phone_number, contact, website]
              self.save_data(data)
              self.unique_check.append(unique_id)



    def load_companies(self, url):
        print("Getting business info", url)
        self.driver.get(url)
        time.sleep(5)
        panel_xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[1]'
        panel_xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[1]'
        scrollable_div = self.driver.find_element(By.XPATH, panel_xpath)
        # scrolling
        flag = True
        i = 0
        while flag:
            print(f"Scrolling to page {i + 2}")
            self.driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', scrollable_div)
            time.sleep(2)

            if "You've reached the end of the list." in self.driver.page_source:
                flag = False

            self.get_business_info()
            i += 1


# Take input from the user for location and industry
location = input("Enter the location: ")
industry = input("Enter the industry: ")


# Replace spaces with '+' in the location
location = location.replace(" ", "+")

# Replace spaces with '+' in the industry
industry = industry.replace(" ", "+")

# Generate the URL using f-strings
url = f"https://www.google.com/maps/search/{location}+{industry}/"

# Add the generated URL to your URLs list
urls = [url]


business_scraper = GoogleMapScraper()
business_scraper.config_driver()
for url in urls:
    business_scraper.load_companies(url)

Enter the location: indore
Enter the industry: hospital
Getting business info https://www.google.com/maps/search/indore+hospital/
Scrolling to page 2
Scrolling to page 3
Scrolling to page 4
Scrolling to page 5
Scrolling to page 6
Scrolling to page 7
Scrolling to page 8
Scrolling to page 9
Scrolling to page 10
Scrolling to page 11
Scrolling to page 12
Scrolling to page 13
Scrolling to page 14
Scrolling to page 15
Scrolling to page 16
Scrolling to page 17
Scrolling to page 18


# Import necessary libraries
Import the required libraries for working with CSV files, sending emails, and sending SMS messages using Twilio.
python
Copy code
# Define email server settings

Set the email address and password of the sender's Gmail account. Make sure to replace 'Generated_password' with the actual password.

# Define Twilio credentials

Set your Twilio account SID, authentication token, and the Twilio phone number from which you want to send SMS messages.

# Open and read the CSV file
with open('google_map_business_data.csv', newline='') as csvfile:
    reader = csv.reader(csvfile)
    
    # Skip the header row if it exists
    next(reader, None)
    
    for row in reader:
        name, rating, bed_count, category, address, phone_number, email, website = row
Open and read the CSV file named 'google_map_business_data.csv'. It assumes that the CSV file has columns in the order: name, rating, bed_count, category, address, phone_number, email, and website. It also skips the header row using next(reader, None).

        # Send email
        subject = f'Information about {name}'
        message = f'Rating: {rating}\nBed Count: {bed_count}\nCategory: {category}\nAddress: {address}\nWebsite: {website}'
        msg = MIMEText(message)
        msg['Subject'] = subject
        msg['From'] = email_address
        msg['To'] = email
Prepare the email message. It sets the subject, message body, and sender/recipient addresses.

        try:
            # Connect to Gmail's SMTP server
            server = smtplib.SMTP('smtp.gmail.com', 587)
            server.starttls()
            server.login(email_address, email_password)

            # Send the email
            server.sendmail(email_address, email, msg.as_string())
            server.quit()

            print(f"Email sent to {name} at {email}")

        except Exception as e:
            print(f"Error sending email to {name}: {str(e)}")
Try to send the email using Gmail's SMTP server. If successful, it prints a confirmation message; otherwise, it prints an error message.

        # Send SMS
        try:
            client = Client(twilio_account_sid, twilio_auth_token)

            # Create and send an SMS message
            message = client.messages.create(
                body=f"Information about {name}:\nRating: {rating}\nBed Count: {bed_count}\nCategory: {category}",
                from_=twilio_phone_number,
                to=phone_number
            )

            print(f"SMS sent to {name} at {phone_number}")

        except Exception as e:
            print(f"Error sending SMS to {name}: {str(e)}")
Try to send an SMS using Twilio. If successful, it prints a confirmation message; otherwise, it prints an error message.


Documentation:

* Library Imports: The script begins by importing the necessary Python libraries, including csv for handling CSV files, smtplib and ssl for sending emails, MIMEText for creating email messages, and Client for sending SMS messages via Twilio.

* Configuration: Set up the configuration details, including email server settings (email_address and email_password) and Twilio credentials (twilio_account_sid, twilio_auth_token, and twilio_phone_number).

* CSV File Processing: The script opens and reads a CSV file named 'google_map_business_data.csv', assuming it contains business data with specific columns. It skips the header row using next(reader, None).

* Email Sending: For each row in the CSV file, the script constructs an email message with information about the business and attempts to send it using Gmail's SMTP server. Any errors during email sending are caught and reported.

* SMS Sending: The script also attempts to send an SMS message to the provided phone number using Twilio. Any errors during SMS sending are caught and reported.

Ensure you have the necessary libraries installed (e.g., twilio) and that you've replaced the placeholder values with your actual credentials and CSV file path before running the script.

In [None]:
import csv
import smtplib
import ssl

from email.mime.text import MIMEText
from twilio.rest import Client
from email.message import EmailMessage

# Define email server settings
email_address = 'sender@gmail.com'
email_password = 'Generated_password'

# Define Twilio credentials
twilio_account_sid = 'your_account_sid'
twilio_auth_token = 'your_auth_token'
twilio_phone_number = 'your_twilio_phone_number'

# Open and read the CSV file
with open('google_map_business_data.csv', newline='') as csvfile:
    reader = csv.reader(csvfile)

    # Skip the header row if it exists
    next(reader, None)

    for row in reader:
        name, rating, bed_count, category, address, phone_number, email, website = row

        # Send email
        subject = f'Information about {name}'
        message = f'Rating: {rating}\nBed Count: {bed_count}\nCategory: {category}\nAddress: {address}\nWebsite: {website}'
        msg = MIMEText(message)
        msg['Subject'] = subject
        msg['From'] = email_address
        msg['To'] = email

        try:
            # Connect to Gmail's SMTP server
            server = smtplib.SMTP('smtp.gmail.com', 587)
            server.starttls()
            server.login(email_address, email_password)

            # Send the email
            server.sendmail(email_address, email, msg.as_string())
            server.quit()

            print(f"Email sent to {name} at {email}")

        except Exception as e:
            print(f"Error sending email to {name}: {str(e)}")

        # Send SMS
        try:
            client = Client(twilio_account_sid, twilio_auth_token)

            # Create and send an SMS message
            message = client.messages.create(
                body=f"Information about {name}:\nRating: {rating}\nBed Count: {bed_count}\nCategory: {category}",
                from_=twilio_phone_number,
                to=phone_number
            )

            print(f"SMS sent to {name} at {phone_number}")

        except Exception as e:
            print(f"Error sending SMS to {name}: {str(e)}")


In [38]:
pip install twilio