## Web Scraping in Social Media - Pinterest

### The project involves leveraging web scraping techniques to gather valuable insights from social media platforms like Pinterest, focusing on extracting image data and pin details for analysis and application in various domains.

## Pinterest Pin Data Retrieval Script

In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.service import Service as EdgeService
from webdriver_manager.microsoft import EdgeChromiumDriverManager
import time
import pandas as pd

def fetch_pinterest_pins(username, max_pins=10):
    pin_details = []
    
    # Setup the Edge WebDriver
    options = webdriver.EdgeOptions()
    options.add_argument('--headless') 
    options.add_argument('--disable-gpu')
    
    service = EdgeService(EdgeChromiumDriverManager().install())
    driver = webdriver.Edge(service=service, options=options)
    
    try:
        url = f"https://www.pinterest.com/{username}/"
        driver.get(url)
        
        # Wait for the page to fully load
        time.sleep(15)  # Increase waiting time for initial page load
        
        # Scroll down to load more pins (if needed)
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(15)  # Increase waiting time after scrolling
        
        pins = driver.find_elements(By.CSS_SELECTOR, 'img[src][alt]')
        for pin in pins[:max_pins]:
            pin_data = {}
            
            # Extracting pin details
            src = pin.get_attribute('src')
            alt = pin.get_attribute('alt')
            
            # Storing pin details in a dictionary
            pin_data['Image URL'] = src
            pin_data['Alt Text'] = alt
            
            pin_details.append(pin_data)
        
    finally:
        driver.quit()
    
     # Saving pin details to a CSV file
        df = pd.DataFrame(pin_details)
        df.to_csv(f"{username}_pins.csv", index=False)
        print(f"Saved pin details to {username}_pins.csv")

# Main block to take user input
if __name__ == "__main__":
    username = input("Enter the Pinterest username: ")
    fetch_pinterest_pins(username, max_pins=10)

Enter the Pinterest username: MariMoon
Saved pin details to MariMoon_pins.csv


## Pinterest Image Scraping Class

In [2]:
import json
import os
import cv2
import numpy as np
from requests import get
from bs4 import BeautifulSoup as soup
from concurrent.futures import ThreadPoolExecutor
from pydotmap import DotMap

class PinterestImageScraper:

    def __init__(self):
        self.json_data_list = []  # List to store JSON data from Pinterest
        self.unique_img = []  # List to store unique image hashes

    @staticmethod
    def get_pinterest_links(body, max_images):
        searched_urls = []
        html = soup(body, 'html.parser')  # Parse the HTML content
        links = html.select('#b_results cite')  # Select all links within 'cite' tags under '#b_results'
        for link in links:
            link = link.text
            if "pinterest" in link:
                searched_urls.append(link)
                if max_images and len(searched_urls) >= max_images:  # Stop if we have enough URLs
                    break
        return searched_urls

    def get_source(self, url, proxies):
        try:
            res = get(url, proxies=proxies)  # Get the page content
            html = soup(res.text, 'html.parser')
            json_data = html.find("script", {"id": "__PWS_INITIAL_PROPS__"}) or html.find("script", {"id": "__PWS_DATA__"})
            if json_data:
                self.json_data_list.append(json.loads(json_data.string)) # Append JSON data to the list
            else:
                self.json_data_list.append({})
        except Exception:
            return

    def save_image_url(self, max_images):
        url_list = []
        for js in self.json_data_list:
            try:
                data = DotMap(js)  # Convert dictionary to DotMap for easier access
                if data.initialReduxState:
                    pins = data.initialReduxState.pins
                else:
                    pins = data.props.initialReduxState.pins
                for pin in pins.values():
                    images = pin.images.get("orig")
                    if isinstance(images, list):
                        for img in images:
                            url_list.append(img.get("url"))
                    else:
                        url_list.append(images.get("url"))
                if max_images and len(url_list) >= max_images:  # Stop if we have enough URLs
                    break
            except Exception:
                continue
        return list(set(url_list))[:max_images]  # Return unique URLs limited by max_images

    def dhash(self, image, hash_size=8):
        resized = cv2.resize(image, (hash_size + 1, hash_size))  # Resize the image
        diff = resized[:, 1:] > resized[:, :-1]  # Compute the difference
        return sum([2 ** i for i, v in enumerate(diff.flatten()) if v])  # Return hash

    def save_images(self, url_list, folder_name):
        if not os.path.exists(folder_name):  # Create directory if it doesn't exist
            os.makedirs(folder_name)
        for img_url in url_list:
            result = get(img_url, stream=True).content  # Get image content
            img_arr = np.asarray(bytearray(result), dtype="uint8")  # Convert to NumPy array
            image = cv2.imdecode(img_arr, cv2.IMREAD_COLOR)  # Decode the image
            img_hash = self.dhash(image)
            if img_hash not in self.unique_img:  # Check if image is unique
                file_name = os.path.join(folder_name, img_url.split("/")[-1])
                cv2.imwrite(file_name, image)  # Save the image
                self.unique_img.append(img_hash)   # Add hash to unique_img
 
    def download(self, url_list, num_workers, output_folder):
        url_list = url_list[:10]  # Limit to 10 images
        idx = len(url_list) // num_workers if len(url_list) > num_workers else len(url_list)
        param = [(url_list[i * idx: (i + 1) * idx], output_folder) for i in range(num_workers)]
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            executor.map(lambda p: self.save_images(*p), param)  # Use ThreadPoolExecutor for concurrent downloads

    @staticmethod
    def start_scraping(max_images, key, proxies):
        assert key, "Please provide a keyword for searching images"
        keyword = key.replace(" ", "%20") + "%20pinterest"
        url = f'https://www.bing.com/search?q={keyword}&first=1&FORM=PERE'
        res = get(url, proxies=proxies, headers={"User-Agent": "Mozilla/5.0"})
        return PinterestImageScraper.get_pinterest_links(res.content, max_images), res.status_code

    def scrape(self, key, output_folder="", proxies=None, threads=10, max_images=10):
        if proxies is None:
            proxies = {}
        extracted_urls, search_engine_status_code = PinterestImageScraper.start_scraping(max_images, key, proxies)
        self.unique_img = []
        self.json_data_list = []

        for url in extracted_urls:
            self.get_source(url, proxies)

        urls_list = self.save_image_url(max_images)

        if urls_list:
            self.download(urls_list, threads, output_folder or key.replace(" ", "_"))
            return {"isDownloaded": True, "urls_list": urls_list, "keyword": key}
        return {"isDownloaded": False, "urls_list": [], "keyword": key}

if __name__ == "__main__":
    keyword = input("Enter the keyword for searching images: ")
    output_folder = keyword.replace(" ", "_")
    scraper = PinterestImageScraper()
    details = scraper.scrape(keyword, output_folder, {}, 10, 10)
    if details["isDownloaded"]:
        print("Downloading completed !!")
    else:
        print("Nothing to download !!")

Enter the keyword for searching images: nature
Downloading completed !!


In [4]:
import json
import os
import cv2
import numpy as np
from requests import get
from bs4 import BeautifulSoup as soup
from concurrent.futures import ThreadPoolExecutor
from pydotmap import DotMap

class PinterestImageScraper:

    def __init__(self):
        self.json_data_list = []  # List to store JSON data from Pinterest
        self.unique_img = []  # List to store unique image hashes

    @staticmethod
    def get_pinterest_links(body, max_images):
        searched_urls = []
        html = soup(body, 'html.parser')  # Parse the HTML content
        links = html.select('#b_results cite')  # Select all links within 'cite' tags under '#b_results'
        for link in links:
            link = link.text
            if "pinterest" in link:
                searched_urls.append(link)
                if max_images and len(searched_urls) >= max_images:  # Stop if we have enough URLs
                    break
        return searched_urls

    def get_source(self, url, proxies):
        try:
            res = get(url, proxies=proxies)  # Get the page content
            html = soup(res.text, 'html.parser')
            json_data = html.find("script", {"id": "__PWS_INITIAL_PROPS__"}) or html.find("script", {"id": "__PWS_DATA__"})
            if json_data:
                self.json_data_list.append(json.loads(json_data.string)) # Append JSON data to the list
            else:
                self.json_data_list.append({})
        except Exception:
            return

    def save_image_url(self, max_images):
        url_list = []
        for js in self.json_data_list:
            try:
                data = DotMap(js)  # Convert dictionary to DotMap for easier access
                if data.initialReduxState:
                    pins = data.initialReduxState.pins
                else:
                    pins = data.props.initialReduxState.pins
                for pin in pins.values():
                    images = pin.images.get("orig")
                    if isinstance(images, list):
                        for img in images:
                            url_list.append(img.get("url"))
                    else:
                        url_list.append(images.get("url"))
                if max_images and len(url_list) >= max_images:  # Stop if we have enough URLs
                    break
            except Exception:
                continue
        return list(set(url_list))[:max_images]  # Return unique URLs limited by max_images

    def dhash(self, image, hash_size=8):
        resized = cv2.resize(image, (hash_size + 1, hash_size))  # Resize the image
        diff = resized[:, 1:] > resized[:, :-1]  # Compute the difference
        return sum([2 ** i for i, v in enumerate(diff.flatten()) if v])  # Return hash

    def save_images(self, url_list, folder_name):
        if not os.path.exists(folder_name):  # Create directory if it doesn't exist
            os.makedirs(folder_name)
        for img_url in url_list:
            result = get(img_url, stream=True).content  # Get image content
            img_arr = np.asarray(bytearray(result), dtype="uint8")  # Convert to NumPy array
            image = cv2.imdecode(img_arr, cv2.IMREAD_COLOR)  # Decode the image
            img_hash = self.dhash(image)
            if img_hash not in self.unique_img:  # Check if image is unique
                file_name = os.path.join(folder_name, img_url.split("/")[-1])
                cv2.imwrite(file_name, image)  # Save the image
                self.unique_img.append(img_hash)   # Add hash to unique_img
 
    def download(self, url_list, num_workers, output_folder):
        url_list = url_list[:10]  # Limit to 10 images
        idx = len(url_list) // num_workers if len(url_list) > num_workers else len(url_list)
        param = [(url_list[i * idx: (i + 1) * idx], output_folder) for i in range(num_workers)]
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            executor.map(lambda p: self.save_images(*p), param)  # Use ThreadPoolExecutor for concurrent downloads

    @staticmethod
    def start_scraping(max_images, key, proxies):
        assert key, "Please provide a keyword for searching images"
        keyword = key.replace(" ", "%20") + "%20pinterest"
        url = f'https://www.bing.com/search?q={keyword}&first=1&FORM=PERE'
        res = get(url, proxies=proxies, headers={"User-Agent": "Mozilla/5.0"})
        return PinterestImageScraper.get_pinterest_links(res.content, max_images), res.status_code

    def scrape(self, key, output_folder="", proxies=None, threads=10, max_images=10):
        if proxies is None:
            proxies = {}
        extracted_urls, search_engine_status_code = PinterestImageScraper.start_scraping(max_images, key, proxies)
        self.unique_img = []
        self.json_data_list = []

        for url in extracted_urls:
            self.get_source(url, proxies)

        urls_list = self.save_image_url(max_images)

        if urls_list:
            self.download(urls_list, threads, output_folder or key.replace(" ", "_"))
            return {"isDownloaded": True, "urls_list": urls_list, "keyword": key}
        return {"isDownloaded": False, "urls_list": [], "keyword": key}

if __name__ == "__main__":
    keyword = input("Enter the keyword for searching images: ")
    output_folder = keyword.replace(" ", "_")
    scraper = PinterestImageScraper()
    details = scraper.scrape(keyword, output_folder, {}, 10, 10)
    if details["isDownloaded"]:
        print("Downloading completed !!")
    else:
        print("Nothing to download !!")

Enter the keyword for searching images: ocean
Downloading completed !!
