# Json

In [None]:
import json

dog_data = {
  "name": "Frieda",
  "is_dog": True,
}

print(json.dumps(dog_data, indent = 2))

# Use json.dump (not dumps) when writing to file
with open("../../interview_prep/exercises/practice/hello_frieda.json", mode="w", encoding="utf-8") as write_file:
    json.dump(dog_data, write_file)

In [18]:
with open("../../interview_prep/exercises/practice/hello_frieda.json", mode="r", encoding="utf-8") as read_file:
    frie_data = json.load(read_file)
frie_data

{'name': 'Frieda', 'is_dog': True}

In [20]:
json.loads(json.dumps(dog_data, indent = 2))

{'name': 'Frieda', 'is_dog': True}

# Requests

In [None]:
import requests

In [24]:
response = requests.get("https://api.github.com")
if response: # Checks response codes 
    print("Success!")
else:
    raise Exception(f"Non-success status code: {response.status_code}")

In [182]:
for url in ["https://api.github.com", "https://api.github.com/invalid"]:
    try:
        response = requests.get(url)
        response.raise_for_status() # If you invoke .raise_for_status(), then Requests will raise an HTTPError for status codes between 400 and 600.
    except requests.exceptions.RequestException as req_err:
        print(f"HTTP error occurred: {req_err}")
    except Exception as err:
        print(f"Other error occurred: {err}")
    else:
        print("Success!")

Success!
HTTP error occurred: 404 Client Error: Not Found for url: https://api.github.com/invalid


In [50]:
# Extract data from requests
response = requests.get("https://api.github.com")
# response.encoding = "utf-8"  # Optional: Requests infers this.
# response.text
# response.headers
type(response.json())

dict

In [74]:
# Query string parameters
response = requests.get(
    "https://api.github.com/search/repositories",
    params={"q": "language:python", "sort": "stars", "order": "desc"},
)
# List and bytes work for params too
# ("q", "language:python"), ("sort", "stars"), ("order", "desc")],
# b"q=language:python&sort=stars&order=desc"

In [76]:
# Pass in headers, Accept tells the server what content types your application can handle
response = requests.get(
    "https://api.github.com/search/repositories",
    params={"q": '"real python"'},
    headers={
        "Accept": "application/vnd.github.text-match+json", 
        "Authorization" : "<api_key_here>"
    },
)
json_response = response.json()
first_repository = json_response["items"][0]
print(first_repository["text_matches"][0]["matches"])

[{'text': 'Real Python', 'indices': [23, 34]}]


In [None]:
# Other HTTP methods
requests.get("https://httpbin.org/get") # equivalent to requests.request("GET", "https://httpbin.org/get")
requests.post("https://httpbin.org/post", data={"key": "value"})
requests.put("https://httpbin.org/put", data={"key": "value"})
requests.delete("https://httpbin.org/delete")
requests.head("https://httpbin.org/get")
requests.patch("https://httpbin.org/patch", data={"key": "value"})
requests.options("https://httpbin.org/get")

In [None]:
# Message body
requests.post("https://httpbin.org/post", data={"key": "value"}) # If your request’s content type is application/x-www-form-urlencoded
response = requests.post("https://httpbin.org/post", json={"key": "value"}) # To send JSON data
json_response = response.json()
json_response["data"] # data lives here
# Details about prepared request that is sent
response.request.url
response.request.body

In [None]:
response = requests.get(
    "https://httpbin.org/basic-auth/user/passwd",
    auth=("user", "passwd")
)
response.request.headers["Authorization"]
# 'Basic dXNlcjpwYXNzd2Q='
from requests.auth import HTTPBasicAuth # HTTPDigestAuth, HTTPProxyAuth
response = requests.get(
    "https://httpbin.org/basic-auth/user/passwd",
    auth=HTTPBasicAuth("user", "passwd")
)

In [165]:
response.request.headers

{'User-Agent': 'python-requests/2.32.3', 'Accept-Encoding': 'gzip, deflate, br', 'Accept': '*/*', 'Connection': 'keep-alive', 'Authorization': 'Basic YXBpa2V5OklMb3ZlOTlEb2xsYXJz'}

In [None]:
token = "<YOUR_GITHUB_PA_TOKEN>"
response = requests.get(
    "https://api.github.com/user",
    auth=("", token)
)
# The correct way to authenticate with a bearer token
from requests.auth import AuthBase
class TokenAuth(AuthBase):
    def __init__(self, token): # AuthBase class does not have initialization logic
        self.token = token
    def __call__(self, request):
        request.headers["Authorization"] = f"Bearer {self.token}"
        return request
response = requests.get(
    "https://api.github.com/user",
    auth=TokenAuth(token)
)

In [None]:
# Turn off SSL Certificate Verification
requests.get("https://api.github.com", verify=False)
response = requests.get('https://github.com', verify ='/path/to/certfile') # Can pass the link to the certificate for validation

In [None]:
# Timeout
requests.get("https://api.github.com", timeout=1)
from requests.exceptions import Timeout
try:
    response = requests.get("https://api.github.com", timeout=(3.05, 5)) # Connect, read
except Timeout:
    print("The request timed out")
else:
    print("The request did not time out")

In [None]:
# Session - to persist parameters across requests
with requests.Session() as session:
    session.auth = TokenAuth(TOKEN)
    first_response = session.get("https://api.github.com/user")
    second_response = session.get("https://api.github.com/user")

In [178]:
from requests.adapters import Retry

In [None]:
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
github_adapter = HTTPAdapter(max_retries=2)
session = requests.Session()
session.mount("https://api.github.com", github_adapter)
try:
    response = session.get("https://api.github.com/")
except RetryError as err:
    print(f"Error: {err}")
finally:
    session.close()

1. Persistent Connections: With a Session, you maintain persistent connections, which means the underlying TCP connection will be reused for multiple requests to the same server. This can improve performance because it avoids the overhead of re-establishing a new connection with each request.
2. Session-specific settings: You can set things like headers, cookies, or authentication on the session itself, and they will be automatically included in every request made through that session. This can help avoid repeating certain configuration across requests.
3. Better Error Handling: Sessions can handle some of the internal state management for you, like cookie handling and keeping track of redirects.

In [None]:
import time

max_duration = 60  # 1 minute
start_time = time.time()
url = "https://your-api-endpoint.com"
session = requests.Session()

while True:
    elapsed_time = time.time() - start_time
    try:
        response = session.get(url) 
        if response:
            print("Request successful!")
            break  
        else:
            # Error is not thrown because we don't do response.raise_for_status()
            print(f"Request failed with status code: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
    if elapsed_time > max_duration:
        print("Max duration reached. Terminating...")
        break
    time.sleep(1)

session.close()

In [None]:
def get_user_transaction_totals(location, transaction_type):
    base_url = "https://jsonmock.hackerrank.com/api/transactions/search"
    page = 1
    user_totals = {}

    while True:
        # Fetch data from the API
        params = {
            "txnType": transaction_type,
            "page": page
        }
        response = requests.get(base_url, params=params)
        data = response.json()
        
        # Process transaction data
        for transaction in data['data']:
            if transaction['location']['address'] == location:
                user_id = int(transaction['userId'])
                amount = float(transaction['amount'][1:].replace(",", ""))  # Remove $ and commas
                user_totals[user_id] = user_totals.get(user_id, 0) + amount
        
        # Check if there are more pages
        if page >= data['total_pages']:
            break
        page += 1

    # Prepare the result: truncate amounts and sort by user ID
    result = [[user_id, int(user_totals[user_id])] for user_id in sorted(user_totals)]
    return result

# ArgParse

In [None]:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("path")
args = parser.parse_args()

In [None]:
parser = argparse.ArgumentParser(description="Fetch exchange rates.")
parser.add_argument('start_date', help='Start date (YYYY-MM-DD)')
parser.add_argument('end_date', help='End date (YYYY-MM-DD)')
parser.add_argument('--from_currency', type=str, default='USD', help='Currency to convert from (default: USD)')
parser.add_argument('--to_currency', type=str, default='EUR', help='Currency to convert to (default: EUR)')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')

args = parser.parse_args()

print(args.start_date, args.end_date, args.from_currency, args.to_currency, args.verbose)

# Example invocations
# python script.py 2025-01-01 2025-02-01 --from_currency USD --to_currency EUR --verbose
# python script.py 2025-01-01 2025-02-01

# Testing

## Assertions

In [None]:
import pytest
from io import StringIO
import sys

def some_function():
    raise ValueError("An error occurred")

def test_some_function():
    with pytest.raises(ValueError):
        some_function()

def some_function():
    print("Hello, world!")

def test_print_output():
    captured_output = StringIO()
    sys.stdout = captured_output
    some_function()
    sys.stdout = sys.__stdout__  # Restore original stdout
    assert captured_output.getvalue() == "Hello, world!\n"

def some_function():
    logging.error("This is an error")

def test_logging():
    with pytest.raises(Exception):  # To ensure logging happens
        with pytest.capture_logs() as logs:
            some_function()
        assert "This is an error" in logs[0].message

In [None]:
import unittest

def some_function():
    raise ValueError("An error occurred")

class TestSomeFunction(unittest.TestCase):
    def test_value_error(self):
        with self.assertRaises(ValueError):
            some_function()

def some_function():
    print("Hello, world!")

class TestPrintOutput(unittest.TestCase):
    def test_print(self):
        captured_output = StringIO()
        sys.stdout = captured_output
        some_function()
        sys.stdout = sys.__stdout__  # Restore original stdout
        self.assertEqual(captured_output.getvalue(), "Hello, world!\n")

def some_function():
    logging.error("This is an error")

class TestLogging(unittest.TestCase):
    def test_logging(self):
        logger = logging.getLogger()
        with self.assertLogs(logger, level="ERROR") as log:
            some_function()
        self.assertIn("This is an error", log.output[0])

In [None]:
import torch
import unittest

class SelfAttention(nn.Module):
    pass
    
class TestSelfAttention(unittest.TestCase):
    def test_output_shape(self):
        custom_output = self.custom_attention(self.x)
        torch_output, _ = self.torch_attention(self.x, self.x, self.x)

        self.assertEqual(custom_output.shape, torch_output.shape)
        self.assertEqual(custom_output.shape, (self.batch_size, self.seq_len, self.embed_dim))

    def test_output_values(self):
        with torch.no_grad():
            custom_output = self.custom_attention(self.x)
            torch_output, _ = self.torch_attention(self.x, self.x, self.x)

            torch.testing.assert_close(
                custom_output,
                torch_output,
                rtol=1e-4,
                atol=1e-4
            )

# Run the tests
if __name__ == "__main__":
    suite = unittest.TestLoader().loadTestsFromTestCase(TestSelfAttention)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

In [13]:
import unittest

class TestStringMethods(unittest.TestCase):

    def test_upper(self):
        self.assertEqual('foo'.upper(), 'FOO')

    def test_isupper(self):
        self.assertTrue('FOO'.isupper())
        self.assertFalse('Foo'.isupper())

    def test_split(self):
        s = 'hello world'
        self.assertEqual(s.split(), ['hello', 'world'])
        # check that s.split fails when the separator is not a string
        with self.assertRaises(TypeError):
            s.split(2)

suite = unittest.TestLoader().loadTestsFromTestCase(TestStringMethods)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)

test_isupper (__main__.TestStringMethods.test_isupper) ... ok
test_split (__main__.TestStringMethods.test_split) ... ok
test_upper (__main__.TestStringMethods.test_upper) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.004s

OK


<unittest.runner.TextTestResult run=3 errors=0 failures=0>

In [31]:
unittest.main(argv=[''], verbosity=2, exit=False)
# This is the key
# argv=['']: Prevents unittest from interpreting the notebook's arguments (which would cause errors).
# verbosity=2: Provides more detailed output.
# exit=False: Prevents unittest from closing the notebook kernel when tests are finished.

test_isupper (__main__.TestStringMethods.test_isupper) ... ok
test_split (__main__.TestStringMethods.test_split) ... ok
test_upper (__main__.TestStringMethods.test_upper) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.005s

OK


<unittest.main.TestProgram at 0x11a7a8d70>

In [None]:
def func(x):
    return x + 1


def test_answer():
    assert func(3) == 5

In [None]:
!pytest

In [None]:
%load_ext pytest

# Now run pytest
%pytest