Blueprint: Extracting Data from an API using the Requests Module 

In [1]:
import requests

response = requests.get('https://api.github.com/repositories',
                        headers = {'Accept': 'application/vnd.github.v3.full+json'})

print(response.status_code) #response code 200 shows it was successful 

200


In [2]:
#inspecting the response is another
#way to ensure that you parse the response accurately:

print(response.encoding)
print(response.headers['Content-Type'])
print(response.headers['Server'])

utf-8
application/json; charset=utf-8
github.com


Looking at the response parameters, we understand that the response follows a
UTF-8 encoding, and the content is returned using the JSON format

Since we already know that the response is a JSON object, we can also
use the json() command to read the response. This creates a list object where each
element is a repository.

In [3]:
import json
print(json.dumps(response.json()[0], indent=2)[:200])  #limited output to the 200 chars in the json

{
  "id": 1,
  "node_id": "MDEwOlJlcG9zaXRvcnkx",
  "name": "grit",
  "full_name": "mojombo/grit",
  "private": false,
  "owner": {
    "login": "mojombo",
    "id": 1,
    "node_id": "MDQ6VXNlcjE=",



In [4]:
#the previous list of repos is not helpful when looking for specific programming languages or topics
#use the Search API:

response = requests.get('https://api.github.com/search/repositories')
print(response.status_code)

422


Request was correct but Server was unable to process the request(422), due to lack of a search query parameter

In [5]:
response = requests.get('https://api.github.com/search/repositories',
                       params={'q': 'data_science+language:python'},
                       headers = {'Accept':'application/vnd.github.v3.text-match+json'}) 
#In the Accept parameter we specify text-match+json so tha the response contains the matching metadata and provides response in JSON format 

print(response.status_code)

200


In [6]:
#list the name of the top 5 repos returned by the search:

from IPython.display import Markdown, display
def printmd(string):
    display(Markdown(string))


for item in response.json()['items'][:5]:
    printmd('**' + item['name'] + '**' + ': repository' +
            item['text_matches'][0]['property']+ '-\"*' +
            item['text_matches'][0]['fragment']+ '*\" matched with'+ '***'+
                                 item['text_matches'][0]['matches'][0]['text'] + '***')

**awesome**: repositorydescription-"*Awesome resources on Bioinformatics, data science, machine learning, programming language (Python, Golang, R, Perl) and miscellaneous stuff.*" matched with***data science***

**Python**: repositorydescription-"*this resporatory have ml,ai,nlp,data science etc.python language related material from many websites eg. datacamp,geeksforgeeks,linkedin,youtube,udemy etc. also it include programming challange/competion solutions*" matched with***data science***

**hu-dsf**: repositorydescription-"*Introduction course to data science using the Python programming language in the form of Jupyter Notebooks*" matched with***data science***

**math-server-docker**: repositorydescription-"*The ideal multi-user Data Science server with Jupyterhub and RStudio, ready for Python, R and Julia languages.*" matched with***Data Science***

**python**: repositorydescription-"*A short course introducing students to the Python programming language for data science*" matched with***Python***

Consider a use case where we want to monitor the comments in a repo and ensure they adhere to community guidelines. 

In [7]:
response = requests.get(
    'https://api.github.com/repos/pytorch/pytorch/issues/comments')
print('Response Code', response.status_code)
print('Number of comments', len(response.json()))

Response Code 200
Number of comments 30


With only 30 comments from such a popular repo with a lot of collaborators and users, it shows we are missing something;

Pagination

    > This is a technique used by APIs to limit the number of elements in the response, i.e. one page at a time, for Github, each page contains 30 results.

In [8]:
#The links field in the response object
#provides details on the number of pages in the response

response.links

{'next': {'url': 'https://api.github.com/repositories/65600975/issues/comments?page=2',
  'rel': 'next'},
 'last': {'url': 'https://api.github.com/repositories/65600975/issues/comments?page=1000',
  'rel': 'last'}}

The next field provides us with a URL to the next page, which would contain the next
30 results, while the last field provides a link to the last page, which provides an
indication of how many search results there are in total

In [9]:
#to get all the results we must implement a function that will parse all the rsults on one page
#and then call the next URL until the last page has been reached. This is implemented as a 
#recursive function:

import pandas as pd 

def get_all_pages(url, params=None, headers=None):
    output_json = []
    response = requests.get(url, params=params, headers=headers)
    if response.status_code == 200:
        output_json = response.json()
        if 'next' in response.links:
            next_url = response.links['next']['url']
            if next_url is not None:
                output_json += get_all_pages(next_url, params, headers)
    return output_json

out = get_all_pages(
    'https://api.github.com/repos/pytorch/pytorch/issues/comments',
    params={
        'since': '2020-07-01T10:00:01Z',  #filter param of time in ISO 8601 format
        'sorted': 'created',
        'direction': 'desc'
    },
    headers = {'Accept': 'application/vnd.github.v3+json'})

df = pd.DataFrame(out)

print (df['body'].count())
df[['id','created_at','body']].sample(1)

1560


Unnamed: 0,id,created_at,body
904,2346634237,2024-09-12T15:38:34Z,@pytorchbot rebase -b main


The dataset we have created here can be used to apply text analytics
blueprints, for example, to identify comments that do not adhere to community
guidelines and flag for moderation. 

It can also be augmented by running it at programmed
time intervals to ensure that latest comments are always captured

Rate Limiting

    > This is a technique used by APIs to limit the number of requests that can be made to an API, ina certain timeframe. This is done by using the rate limit headers.

We can make a call to the API to only retrieve the headers by using the head
method and then peering into the X-Ratelimit-Limit, X-Ratelimit-Remaining, and
X-RateLimit-Reset header elements:


In [10]:
response = requests.head(
    'https://api.github.com/repos/pytorch/pytorch/issues/comments')

print('X-Ratelimit-Limit', response.headers['X-Ratelimit-Limit'])
print('X-Ratelimit-Remaining', response.headers['X-Ratelimit-Remaining'])

#convert UTC time to human-readable format

import datetime

print(
    'Rate Limits reset at',
    datetime.datetime.fromtimestamp(int(
        response.headers['X-RateLimit-Reset'])).strftime('%c'))


X-Ratelimit-Limit 60
X-Ratelimit-Remaining 3
Rate Limits reset at Fri Sep 13 20:02:28 2024


The function han
dle_rate_limits shown next slows down the requests to ensure they are spaced out
over the entire duration. It does so by distributing the remaining requests equally
over the remaining time by applying a sleep function.

This ensures that our data
extraction blueprint respects the rate limits and spaces the requests so that all the
requested data is downloaded:

In [11]:
from datetime import datetime
import time 

def handle_rate_limits(response):
    now = datetime.now()
    reset_time = datetime.fromtimestamp(
        int(response.headers['X-Ratelimit-Reset']))
    remaining_requests = response.header['X-Ratelimit_Remaining']
    remaining_time = (reset_time - now).total_seconds()
    intervals = remaining_time/ (1.0 + int(remaining_requests))
    print('Sleeping for', intervals)
    time.sleep(intervals)
    return True 

A retry strategy will allow API
calls to be retried in case of specified failure conditions. It can be implemented
with the HTTPAdapter library that allows more fine-grained control of the underlying
HTTP connections being made.

-Specify a backoff_factor value to increase the delay between each retry, and thus avoid hammering the server. Specify a custom adapter that allows you to implement the retry strategy, by overriding the connection behavior of the default Sessions object.

In [12]:
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

retry_strategy = Retry(
    
    total=5,  #retry 5 times for a failed attempt/request
    status_forcelist = [500, 503, 504], #retry on these specific server errors
    backoff_factor = 1 #time delay between retries
)

retry_adapter = HTTPAdapter(max_retries=retry_strategy)

http = requests.Session()
http.mount('https://', retry_adapter)  #custom retry adapter
http.mount('http://', retry_adapter) 

response = http.get('https://api.github.com/search/repositories',
                    params={'q': 'data_science+language:python'})

for item in response.json()['items'][:5]:
    print(item['name'])

awesome
Python
hu-dsf
math-server-docker
python


Putting all this together, we can modify our blueprint to handle pagination, rate limits and retries:


In [13]:
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

retry_strategy = Retry(
    total =5,
    status_forcelist=[500,503,504],
    backoff_factor = 1
)

retry_adapter = HTTPAdapter(max_retries = retry_strategy)

http = requests.Session()
http.mount('https://', retry_adapter)
http.mount('http://', retry_adapter)

def get_all_pages(url, param=None, header=None):
    output_json = []
    response = http.get(url, params=param, headers=header)
    if response.status_code == 200:
        output_json = response.json()
        if 'next' in response.links:
            next_url = response.links['next']['url']
            if (next_url is not None) and (handle_rate_limits(response)):
                output_json += get_all_pages(next_url, param, header)

    return output_json 

Summary:

Unauthenticated requests have a lower rate limit. You can ID your data extraction app to Github by registering an account, then make authenticated requests to the API that increases the rate limit.

The blueprint aboce shows how to extract data from any API using the simple requests module, and creating your own dataset. 

-------------------------------------------------------------------------------------------------

Blueprint: Extracting Twitter Data with Tweepy

Installing and Configuring Tweepy

to authenticate the app with the Twitter API,
and we do it with the help of the tweepy.AppAuthHandler module to which we pass
the API key and API secret key we obtained in the previous step(documentation). 

Finally, we instantiate
the tweepy.API class, which will be used to make all subsequent calls to the Twitter
API. Once the connection is made, we can confirm the host and version of the
API object.

In [1]:
import tweepy 

bearer_token = 'AAAAAAAAAAAAAAAAAAAAAAxrvwEAAAAAIIloa2%2FF9gZIbf3nRUgGBjaD3tc%3DvT47j9XLLH99dpoVUFWI8L8mhBa3M5vwvqvYcDW3XkxdychNIo'

client = tweepy.Client(bearer_token)


print('Client:', client)

print(tweepy.__version__)


Client: <tweepy.client.Client object at 0x00000233EDA53EC0>
4.14.0


Extracting Data from the Search API

In [2]:
import time 

username = 'afiadata_ke'

try:
    user = client.get_user(username=username, user_fields=['public_metrics'])
    print(f'User ID: {user.data.id}')
    print(f'Name: {user.data.name}')
    print(f'Username: {user.data.username}')
    if user.data.public_metrics:
        print(f'Followers Count: {user.data.public_metrics['followers_count']}')
        print(f'Following Count: {user.data.public_metrics['following_count']}')
    else:
        print('Public metrics are not available.')
except tweepy.TweepyException as e:
    #handling rate limit when exceeded 

    reset_time = e.retry_after if hasattr(e, 'retry_after') else 60
    print(f'Rate limit exceeded. Waiting for {reset_time} seconds')
    time.sleep(reset_time)
    fetch_user(username)  #retry the request after waiting 

User ID: 1778002463053737984
Name: Afiadata
Username: afiadata_ke
Followers Count: 398
Following Count: 19


Twitter has an access barrier, such that the below code can only run for premium accounts, when fetching or retrieving tweets. Please check X (Twitter) API key documentation for more details.

In [3]:
#search_term = 'afiadata'

#tweets = tweepy.Cursor(api.search_tweets, q = search_term, lang='en').items(100)

#retrieved_tweets = [tweet._json for tweet in tweets] #load the results as a  json object and return a list
#df = pd.json_normalize(retrieved_tweets)  #return a dataframe from the json object

#df[['text']].sample(3)

Say we wanted to filter by RTs, and retrieve the full text of all tweets. The Standard search API seraches only a sample of recent tweets publshed in the past 7 days. Count is the number of tweets that can be retrieved in one call. 

Note: This is an alternative code to use once on premium access:

In [4]:
################################
#api = tweepy.API(auth,
#                wait_on_rate_limit=True,
#                wait_on_rate_limit_notify=True,
#                retry_count=5,
#                retry_delay=10)

#search_term ='afiadata_ke OR afiadata - filter:retweets'

#tweets = tweepy.Cursor(api.search_tweets, q = search_term,
#                        lang='en',
#                       tweet_mode='extended',
#                       count=30).items(12000)  #limit to 12000 tweets. We can retrieve 30 per call

#retrieved_tweets = [tweet._json for tweet in tweets] #load the results as a  json object and return a list

#df = pd.json_normalize(retrieved_tweets)
#print('Number of retrieved tweets', len(df))

#df[['created_at','full_text','entities.hashtags']].sample(2) #extract the relevant fields 

################################

Twitter also returns several entities such
as hashtags contained within the tweet, and it would be interesting to see which hashtags
are used heavily when discussing afiadata:

In [5]:
#import numpy as np

#def extract_entities(entity_list):
#    entities = set()
#    if len(entity_list) != 0:
#        for item in entity_list:
#            for key,value in item.items():
#                if key== 'text':
#                    entities.add(value.lower())
#    return list(entities)

#df['Entities'] = df['entities.hashtags'].apply(extract_entities)
#pd.Series(np.concatenate(df['Entities'])).value_counts()[:25].plot(kind='barh', figsize=(12,8))

Extracting Data from a User's Timeline

NOTE: Please note the acess level your API has to the Twitter API, to determine which classes to use under tweepy. Some methods apply to premium accounts only and therefore do not serve a purpose for free accounts.

In [6]:
#Assuming you were on Premium Access: (otherwise you cannot retrieve without premium access)

#api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

#tweets = tweepy.Cursor(api.user_timeline,
#                       screen_name = 'afiadata_ke',
#                       lang='en',
#                       tweet_mode='extended',
#                       count =50).items(200)
#retrieved_tweets = [tweet._json for tweet in tweets]
#df = pd.io.json.json_normalize(retrieved_tweets) 
#print('Number of retrieved tweets', len(df))


In [7]:
#def get_user_timeline(screen_name):  #screen name is the handle
#    api = tweepy.API(auth,
#                     wait_on_rate_limit=True,
#                     wait_on_rate_limit_notify=True)
#    tweets = tweepy.Cursor(api.user_timeline,
#                           screen_name=screen_name,
#                           lang="en",
#                           tweet_mode='extended',
#                           count=200).items()
#    retrieved_tweets = [tweet._json for tweet in tweets]
#    df = pd.io.json.json_normalize(retrieved_tweets)
#    df = df[~df['retweeted_status.id'].isna()]
#    return df

#df = get_user_timeline('afiadata_ke')
#print('Number of retrieved tweets', len(df))

One of the quirks of the Tweepy implementation is that in the case
of retweets, the full_text column is truncated, and the retweeted_status.full_text column must be used to retrieve all the
characters of the tweet. 

For our use case, retweets are not important,
and we filter them by checking if retweeted_status.id is
empty. However, depending on the use case, you can add a condition
to replace the column full_text with retweeted_status.full_text in the case of retweets.

You can create a wordcloud as learnt earlier, or import the wordcloud function from the previous chapter file and identify the keywords being used.

---------------------------------------------------------------------------------

Extracting Data from the Streaming API

In [8]:
from datetime import datetime
import math 

bearer_token = bearer_token

access_token = '555015082-3aHs9R2YlpqrFJLuuFyaPpyUptBaJP7L0VVWm3pV'

access_token_secret = 'FFEmTmkq3vNPJwCzBhKjzQaim758Wpip0lupSv8A59Ev1'

consumer_key = 'KVUHB4pZsxxYe8DHH72yI9ZfU'

consumer_secret = 'PGUBmcOzX1vofA467qJ1PYXf4vmwKM0FcnggoRyIciq7viRc58' 



class MyStream(tweepy.StreamingClient):
    def __init__(self, bearer_token, max_tweets=math.inf):
        super().__init__(bearer_token)
        self.num_tweets = 0
        self.max_tweets = max_tweets

    def on_tweet(self, tweet): #method is called when a tweet is received
        if self.num_tweets < self.max_tweets:
            print(f'{datetime.now()}: {tweet.text}')
            self.num_tweets +=1

        else:
            self.disconnect()  #quit streaming if max_tweets is reached

    def on_errors(self, status_code):
        if status_code == 420:
            print('Rate limit reached. Waiting...')
        else:
            print(f'Error received: {status_code}')
        return True #continues listening to stream


In [17]:
##provide user authentication tokens and initialize the stream

auth = tweepy.OAuth1UserHandler(consumer_key, consumer_secret, access_token, access_token_secret)

api = tweepy. API(auth)

stream = MyStream(bearer_token=bearer_token, max_tweets=50)

#add a rule for filtering tweets 
#stream.add_rules(tweepy.StreamRule('Python')) --> this works only on premium, upgrade your package to run the code

stream.filter(tweet_fields = ['Context_annnotations', 'created_at'])

#upgrade package to basic or premium to access these features


Stream encountered HTTP error: 403
HTTP error response text: {"client_id":"29321996","detail":"When authenticating requests to the Twitter API v2 endpoints, you must use keys and tokens from a Twitter developer App that is attached to a Project. You can create a project via the developer portal.","registration_url":"https://developer.twitter.com/en/docs/projects/overview","title":"Client Forbidden","required_enrollment":"Appropriate Level of API Access","reason":"client-not-enrolled","type":"https://api.twitter.com/2/problems/client-forbidden"}
Stream encountered HTTP error: 403
HTTP error response text: {"client_id":"29321996","detail":"When authenticating requests to the Twitter API v2 endpoints, you must use keys and tokens from a Twitter developer App that is attached to a Project. You can create a project via the developer portal.","registration_url":"https://developer.twitter.com/en/docs/projects/overview","title":"Client Forbidden","required_enrollment":"Appropriate Level of API

In [12]:

import tweepy

# Your keys and tokens
bearer_token = bearer_token

access_token = '555015082-3aHs9R2YlpqrFJLuuFyaPpyUptBaJP7L0VVWm3pV'

access_token_secret = 'FFEmTmkq3vNPJwCzBhKjzQaim758Wpip0lupSv8A59Ev1'

consumer_key = 'KVUHB4pZsxxYe8DHH72yI9ZfU'

consumer_secret = 'PGUBmcOzX1vofA467qJ1PYXf4vmwKM0FcnggoRyIciq7viRc58' 

# Authenticate to Twitter using OAuth1
auth = tweepy.OAuth1UserHandler(consumer_key, consumer_secret, access_token, access_token_secret)
api = tweepy.API(auth)

# Post a tweet
try:
    api.update_status("First tweet from our API!")
    print("Tweet posted successfully!")
except tweepy.TweepyException as e:
    print(f"Error: {e}")


#upgrade package to basic from free plan 


Error: 403 Forbidden
453 - You currently have access to a subset of Twitter API v2 endpoints and limited v1.1 endpoints (e.g. media post, oauth) only. If you need access to this endpoint, you may need a different access level. You can learn more here: https://developer.twitter.com/en/portal/product


For a publicly available data source such as wikipedia, the authentication and generation of access tokens is not necessary. 

In [17]:
user_agent = 'NLPTraining/1.0(dwayneqlint1@gmail.com)' 

#user agent is needed, and typically includes, name of app, a versionnumber and a way to contact the developer

In [19]:
import wikipediaapi 

wiki_wiki = wikipediaapi.Wikipedia(
    language = 'en',
    extract_format = wikipediaapi.ExtractFormat.WIKI,
    user_agent=user_agent
)

p_wiki = wiki_wiki.page('Cryptocurrency')
print(p_wiki.text[:200], '...')

A cryptocurrency, crypto-currency, or crypto is a digital currency designed to work as a medium of exchange through a computer network that is not reliant on any central authority, such as a governmen ...
