# 01 Pickle

In [1]:
# Setup
import pickle
from datetime import datetime

In [2]:
# Solution part 1
from urllib.request import urlopen
from tempfile import TemporaryFile

tmp = TemporaryFile('w+b')
web = urlopen('http://example.com')
anon = lambda: None

unpickle = [tmp, web, anon]

In [3]:
# Solution part 2
class Person:
    "Plain class that holds file descriptor"
    def __init__(self, name, address, favorite_color):
        self.name = name
        self.address = address
        self.favorite_color = favorite_color
        self.created_at = datetime.now()

    def __setstate__(self, data):
        self.__dict__ = data
        self.created_at = datetime.now()
        
person =  Person('David', '45 Main St', "Red")

## Tests

In [4]:
def test_listed_types():
    from itertools import combinations
    assert len(unpickle) >= 3, "Provide at least 3 objects"
    for (a, b) in combinations(unpickle, 2):
        assert not issubclass(type(a), type(b)), \
            f"issubclass({a}, {b}) not permitted"
        assert not issubclass(type(b), type(a)), \
            f"issubclass({b}, {a}) not permitted"
        
test_listed_types()

In [5]:
def test_pickle_failure():
    for obj in unpickle:
        pickled = False
        try:
            pkl = pickle.dumps(obj)
            pickled = True
        except:
            pass
            
        if pickled:
            assert False, f"{repr(obj)} should not be pickleable"

            
test_pickle_failure()

In [6]:
def test_person_attributes():
    assert hasattr(person, 'name')
    assert hasattr(person, 'address')
    assert hasattr(person, 'favorite_color')
    assert hasattr(person, 'created_at')
    assert isinstance(person.created_at, datetime)

test_person_attributes()

In [7]:
def test_deserialize():
    pkl = pickle.dumps(person)
    t1 = datetime.now()
    new = pickle.loads(pkl)
    t2 = datetime.now()
    assert person.created_at < t1 < new.created_at < t2, \
        "Time must be refreshed when record is deserialized"
    assert person.name == new.name
    assert person.address == new.address
    assert person.favorite_color == new.favorite_color
    
test_deserialize()

# 02 Basic JSON

In [8]:
# Setup
import json

class Person:
    def __init__(self, name=None, address=None, 
                       favorite_color=None):
        self.name = name
        self.address = address
        self.favorite_color = favorite_color
        
    def toJSON(self):
        return "JSON string here"
    
    def fromJSON(self, jstr):
        return "Return new instance"

Red = "Red"
person =  Person('David', '45 Main St', Red)

In [9]:
bad_json = """
{"name": "David",
 "address": '45 Main St',
 "favorite_color": Red,
}
"""
good_json = bad_json

In [10]:
# Solution part 1
good_json = """
{"name": "David",
 "address": "45 Main St",
 "favorite_color": "Red"
}
"""

In [11]:
# Solution part 2
_Person = Person

class Person(_Person):
    def toJSON(self):
        return json.dumps(self.__dict__)
    
    def fromJSON(self, jstr):
        new = self.__class__()
        new.__dict__ = json.loads(jstr)
        return new
    
person =  Person('David', '45 Main St', Red)

## Tests

In [12]:
def test_json_fixed():
    try:
        person = json.loads(good_json)
    except:
        person = dict()
    assert set(person.keys()) == {'name', 'address', 'favorite_color'}
    assert person.get('name') == 'David'
    assert person.get('address') == "45 Main St"
    assert person.get('favorite_color') == Red
    
test_json_fixed()

In [13]:
def test_round_trip():
    from random import randint
    name = f"Jane-{randint(1, 1000)}"
    person = Person(name, "123 Any Road", "Green")
    jstr = person.toJSON()
    try:
        json.loads(jstr)
    except json.JSONDecodeError:
        assert False, "Invalid JSON string produced"
    new = person.fromJSON(jstr)
    assert isinstance(new, Person), f"Not a Person: {repr(new)}"
    assert person.name == new.name
    
test_round_trip()

# 03 Sharing JSON

# 04 JSON Schema

## Setup

In [14]:
import json
from jsonschema import validate, ValidationError

def is_valid(instance, schema):
    try:
        validate(instance, schema)
        return True
    except:
        return False
    
json1 = """{
    "Band": "Pere Ubu",
    "Guitarists": ["Michele Temple", "Keith Moliné",
                   "Peter Laughner", "Tom Herman",
                   "Mayo Thompson", "Jim Jones",
                   "Wayne Kramer"],
    "NumAlbums": 18,
    "Formation": 1975
}"""

json2 = """{
    "Band": "L7",
    "Guitarists": ["Suzi Gardner", "Donita Sparks"],
    "NumAlbums": 7
}"""

json3 = """{
    "Band": "The Runaways",
    "Guitarists": ["Joan Jett", "Lita Ford"],
    "NumAlbums": 4,
    "Formation": "1975"
}"""

schema1 = {"type": "object"}
schema2 = {"type": "string"}
schema3 = {"type": "array"}

## Solution

In [15]:
schema1 = {
    "type": "object",
    "properties": {
        "NumAlbums": {
            "type": "number",
            "minimum": 5
        }
    }
}
schema2 = {
    "type": "object",
    "properties": {
        "Guitarists": {
             "type": "array",
             "maxItems": 3
        }
    }
}
schema3 = {
    "type": "object",
    "required": ["Formation"]
}

## Tests

In [16]:
def test_schema1():
    assert is_valid(json.loads(json1), schema1), \
        "schema1 must accept json1"
    assert is_valid(json.loads(json2), schema1), \
        "schema1 must accept json2"
    assert not is_valid(json.loads(json3), schema1), \
        "schema1 must reject json3"

test_schema1()

In [17]:
def test_schema2():
    assert not is_valid(json.loads(json1), schema2), \
        "schema2 must reject json1"
    assert is_valid(json.loads(json2), schema2), \
        "schema2 must accept json2"
    assert is_valid(json.loads(json3), schema2), \
        "schema2 must accept json3"
    
test_schema2()

In [18]:
def test_schema3():
    assert is_valid(json.loads(json1), schema3), \
        "schema3 must accept json1"
    assert not is_valid(json.loads(json2), schema3), \
        "schema3 must reject json2"
    assert is_valid(json.loads(json3), schema3), \
        "schema3 must accept json3"
    
test_schema3()

# 05 CSV Module

## Setup

In [19]:
import csv
from dataclasses import dataclass

@dataclass
class InventoryItem:
    '''Class for keeping track of an item in inventory.'''
    name: str
    price: float
    quantity: int = 0

    def total_cost(self) -> float:
        return self.unit_price * self.quantity_on_hand

## Solution

In [20]:
items = []
with open('data/Inventory.txt') as fh:
    inventory = csv.reader(fh, delimiter="|", 
                               quotechar="/",
                               escapechar="%")
    skip_header = next(inventory)
    for record in inventory:
        if not record:
            continue
        name = record[0]
        price = float(record[1])
        quantity = int(record[2].replace(',','')) if record[2] else 0
        record = InventoryItem(name, price, quantity)
        items.append(record)

## Tests

In [21]:
def test_structure():
    import pickle
    correct = pickle.load(open('data/Inventory.pkl', 'rb'))
    assert len(correct) == len(items), "Wrong number of records constructed"
    assert correct == items, "Error in one or more records"
    
test_structure()

# 06 CSV with Pandas

## Setup

In [22]:
import pandas as pd
items = pd.DataFrame()

## Solution

In [23]:
items = pd.read_csv('data/Inventory.txt', 
                    skip_blank_lines=True, 
                    sep='|',
                    quotechar='/',
                    escapechar='%',
                    thousands=',')
print(items.dtypes)
items

Name         object
Price       float64
Quantity    float64
dtype: object


Unnamed: 0,Name,Price,Quantity
0,Wankle rotary engine,555.55,527.0
1,Sousaphone w/ stand,333.33,123.0
2,Feather Duster,22.22,900.0
3,Area 51 metal fragment,9999.99,
4,The kitchen sink,129.99,43.0
5,Steak knife,12.49,1000000.0


## Tests

In [24]:
def test_correct_df():
    import pickle
    correct = pickle.load(open('data/Inventory-df.pkl', 'rb'))
    assert isinstance(items, pd.DataFrame)
    assert correct.equals(items), "Generated DataFrame does not match"
    
test_correct_df()

# 07 XML with ElementTree

## Setup

In [25]:
import xml.etree.ElementTree as ET

# Unfortunately, ElementTree lacks pretty print
import xml.dom.minidom
def pretty_etree(doc):
    dom = xml.dom.minidom.parseString(ET.tostring(doc))
    return dom.toprettyxml(indent='  ')

## Solution

In [26]:
band = ET.Element('band')
name = ET.SubElement(band, 'name')
name.text = 'L7'
guitarists = ET.SubElement(band, 'guitarists')
for guitarist in ["Suzi Gardner", "Donita Sparks"]:
    g = ET.SubElement(guitarists, 'item')
    g.text = guitarist
numalbum = ET.SubElement(band, 'numalbums')
numalbum.text = '7'

## Tests

In [27]:
def test_doc_type():
    assert isinstance(band, ET.Element)
    
test_doc_type()

In [28]:
def test_doc_structure():
    assert [e.tag for e in band] == ['name', 'guitarists', 'numalbums']

test_doc_structure()

In [29]:
def test_guitarists():
    guitarists = band.find('guitarists').findall('item')
    assert [g.text for g in guitarists] == ['Suzi Gardner', 'Donita Sparks']
    
test_guitarists()

# 08 XML with lxml.objectify

## Setup

In [30]:
from lxml import objectify
doc = objectify.ObjectifiedElement()
query1 = "doc.bands.band[2].formation"
query2 = ...
query3 = ...
query4 = ...

## Solution

In [31]:
doc = objectify.parse(open('data/Bands.xml'))
doc = objectify.E.root(doc.getroot())

query2 = "doc.bands.band[0].guitarists.item[-2:]"
query3 = "[b.name for b in doc.bands.band[:]]"
query4 = "sum(b.numalbums for b in doc.bands.band[:])"

## Tests

In [32]:
def test_doc_type():
    assert isinstance(doc, objectify.ObjectifiedElement)
    
test_doc_type()

In [33]:
def test_third_formation():
    assert isinstance(query1, str)
    assert eval(query1) == 1975

test_third_formation()

In [34]:
def test_guitarists():
    assert query2.startswith('doc.')
    assert set(eval(query2)) == {'Jim Jones', 'Wayne Kramer'}
    
test_guitarists()

In [35]:
def test_bands():
    assert isinstance(query3, str)
    assert 'doc.' in query4
    assert set(eval(query3)) == {'Pere Ubu', 'L7', 'The Runaways'}

test_bands()

In [36]:
def test_albums():
    assert isinstance(query4, str)
    assert 'doc.' in query4
    assert eval(query4) == 29

test_albums()

# 09 XPATH

## Setup

In [37]:
import xml.etree.ElementTree as ET
doc = ET.ElementTree()

def text_of(doc, xpath):
    matches = doc.findall(xpath)
    texts = [e.text for e in matches]
    return texts[0] if len(texts) == 1 else texts

xpath1 = ".//band[3]/formation"
xpath2 = ...
xpath3 = ...
xpath4 = ...

## Solution

In [38]:
doc = ET.parse('data/Bands.xml')

xpath2 = ".//band[1]/guitarists[1]/item[last()-2]"
xpath3 = "./band/name"
xpath4 = "./band/numalbums"

def count(nums):
    return sum(map(int, nums))

## Tests

In [39]:
def test_doc_type():
    assert isinstance(doc, ET.ElementTree)
    
test_doc_type()

In [40]:
def test_third_formation():
    assert text_of(doc, xpath1) == '1975'

test_third_formation()

In [41]:
def test_guitarist():
    assert text_of(doc, xpath2) == 'Mayo Thompson'
    
test_guitarist()

In [42]:
def test_bands():
    assert set(text_of(doc, xpath3)) == {'Pere Ubu', 'L7', 'The Runaways'}

test_bands()

In [43]:
def test_albums():
    nums = set(text_of(doc, xpath4))
    assert nums == {'18', '7', '4'}
    assert count(nums) == 29

test_albums()

# 10 Serializing NumPy

## Setup

In [45]:
import numpy as np

arr1 = ...
arr2 = ...
arr3 = ...

## Solution

In [77]:
arr1 = np.random.randint(10, 51, 300_000, dtype=np.uint8).reshape(20, 30, 10, 50)
arr2 = np.zeros(100, dtype=complex)
arr3 = np.array(['Red', 'Green', 'Blue'])

np.savez_compressed('tmp/exercise', arr1, arr2, arr3)

## Tests

In [60]:
def test_arr1():
    assert str(arr1.dtype) == 'uint8'
    assert arr1.shape == (20, 30, 10, 50)
    assert arr1.min() == 10
    assert arr1.max() == 50
    
test_arr1()

In [64]:
def test_arr2():
    assert 'complex' in str(arr2.dtype)
    assert arr2.size == 100
    
test_arr2()

In [71]:
def test_arr3():
    assert arr3.size == 3
    assert arr3[0] == 'Red'
    assert arr3[1] == 'Green'
    assert arr3[2] == 'Blue'
    
test_arr3()

In [80]:
def test_archive():
    import os
    fsize = os.stat('tmp/exercise.npz').st_size 
    assert fsize < 220_000, "The file might not have been compressed"
    data = list(np.load('tmp/exercise.npz').values())
    assert data[0].shape == (20, 30, 10, 50)
    assert str(data[0].dtype) == 'uint8'
    assert 'complex' in str(data[1].dtype)
    assert set(data[2]) == {'Red', 'Green', 'Blue'}

test_archive()