# A Magnum of Opera (aka "A Lot of Work")

(or opuses, or opi, take your pick)

Previously, we focused on Flask and SQLAlchemy - we evolved from our Latency approach, better structuring our project, and leveraging tooling to do so in the Python ecosystem.

Rather than accelerating too quickly, I have kept most of the code the same as previously, but focused on a practical example of dependency injection. Firstly, to show how code can be decoupled, in this case with `injector`, and how that can give benefits to separation of concerns and those original DDD ideas of separated layers: domain, application and persistence.

# Decorating the Lab

To demonstrate the power of decoupling - with or without an Inversion of Control container like `injector`'s - we will briefly rewrite the entire stack. Even more, we will switch to a completely different coding paradigm: coroutines. Our domain model and its tests will remain unchanged.

This requires one more Python construct that we have brushed over several times - `decorators`. Note that this is distinct from the _decorator pattern_ , which, in Python, links more to mixins and the wrapper pattern we looked at in previous weeks (reminder: `class MyDict: _real_dict = {}`).

How do decorators work? They take a function or method, and return a tweaked version. Perhaps this means a side-effect, such as recording an action, or a pure mutation, such as adjusting passed arguments or return values.

They can be specified in a couple of ways:

In [None]:
class StringifyResult:
    # Called when Python hits the decorator in code
    def __init__(self, f):
        self._f = f

    # Called when you call the function
    def __call__(self, *args):
        original_result = self._f(*args)
        return float(original_result)

In [None]:
@StringifyResult
def add_numbers(a, b):
    return a + b

@StringifyResult
def double_numbers(a):
    return 2 * a

In [None]:
add_numbers(1, 3)
double_numbers(3)

6.0

A more succint and common, but slightly less intuitive, way of writing this is as a function that returns a function:

In [None]:
def stringify_result(f):
    # I've given args a and b for clarity, but *args, **kwargs
    # is more common and flexible
    def tweaked_f(a, b):
        original_result = f(a, b)
        return str(original_result)

    return tweaked_f

In [None]:
@stringify_result
def add_numbers(a, b):
    return a + b

In [None]:
add_numbers(1, 3)

'4'

We could do an entire session, or series of sessions, or [meetup series](https://twitter.com/belfastfp) on functional programming, but we'll suffice with a brief aside to whet the appetite.

### Functional Works

Functional programming can get very theoretical - there are excellent intros to the [computing theory](https://github.com/hmemcpy/milewski-ctfp-pdf) that give a whole new way about thinking about programming. For the moment:

* functional programming focuses on functions, rather than objects
* state is local, not global or maintained - values are passed into functions, and returned from them
* functions can be passed all over the show
* it focuses on calling functions, nesting right down, rather than long sequences of commands (imperative programming)

In Python, we rarely take an exclusively functional approach, but venerable languages such as Haskell, Scala, Lisp, Clojure, (and dozens of new ones known only in fashionable coffee shops in Shoreditch) can be very or entirely functional.

However, it's important to be aware of - it's the theoretical basis of MapReduce and massively scalable code. If state is not shared, globally or at a class/object level, parallelisation is trivial.

In [None]:
class Sequence:
    content = 'abcbabbbcbabccba'
    def count_distinct(self):
        distinct = set(self.content)
        return {character: self.content.count(character) for character in distinct}

my_seq = Sequence()
my_seq.count_distinct()

{'c': 4, 'b': 8, 'a': 4}

vs

In [None]:
def count_distinct(content):
    distinct = set(content)
    return {character: content.count(character) for character in distinct}

my_seq = 'abcbabbbcbabccba'
count_distinct(my_seq)

{'c': 4, 'b': 8, 'a': 4}

Why is the second helpful?

In [None]:
import textwrap

def split_content_to_substrings(content):
    return textwrap.fill(content, 4).split()

def compose_distinct(distincts):
    print('DISTINCTS:', list(distincts))

    # Work out all occurring characters
    all_characters = set(sum([list(d.keys()) for d in distincts], []))

    counts = {c: 0 for c in all_characters}
    # Less functional - this could be map/comprehension, but easier to read!
    for distinct in distincts:
        for c, ct in distinct.items():
            counts[c] += ct

    return counts

# a series of composed functions
compose_distinct(list(map(count_distinct, split_content_to_substrings(my_seq))))

DISTINCTS: [{'c': 1, 'b': 2, 'a': 1}, {'b': 3, 'a': 1}, {'a': 1, 'b': 2, 'c': 1}, {'a': 1, 'b': 1, 'c': 2}]


{'a': 4, 'b': 8, 'c': 4}

Unlike the first, this could be immediately parallelized.

### More Painting

Many decorators exist:

* `@classmethod`: indicates a class method (first arg is the class, not `self`)
* `@staticmethod`: indicates a class method (no first arg)
* `@timeit`: indicates you want this function timed (recall timeit from the Latency exercises)
* `@contextmanager`: preps a function for use in a with statement
* `@property`: indicates that you want to expose a getter&/setter as an object property
* `@pytest.fixture`: indicates that the function it applies to defined an injectable fixture
* `@app.route`: indicates that the following function is a route-handler in Flask

Note from the last one, that decorators can be written as object methods, as well as functions.

## Coroutines

Coroutines in Python started out as decorators - @asyncio.coroutine - but this approach is now deprecated and you will generally see the syntax:

In [None]:
async def my_asynchronous_function(a, b):
    return a + b

This should look like any other function, except the `async`. And, in fact, it really is - except that:

In [None]:
my_asynchronous_function(1, 2)

<coroutine object my_asynchronous_function at 0x000001A45F905490>

It returns a coroutine. This is a construct that can be scheduled, and may have a result, but not immediately. They can be chained. Again, we'll look at practical examples rather than delving into coroutine theory. However, coroutines work on the principle of an event loop that ensures they actually happen.

In [None]:
import asyncio

# This approach is required if no event loop yet exists - but in Jupyter it does
# asyncio.run(my_asynchronous_function)

await my_asynchronous_function(1, 2)

3

The `await` syntax allows us to compose these like ordinary functions - but if some IO, sleep or such is taking time, Python can carry on executing other coroutines that have been scheduled, and switch between them, returning control to the first one when it's blocking step is done.

In [None]:
async def my_slow_function():
    print('Doing')
    await asyncio.sleep(3)
    print('Done')

loop = asyncio.get_event_loop()
result = loop.create_task(my_slow_function())

await asyncio.sleep(1)
other_result = await my_asynchronous_function(1, 2)
print(other_result)
await result

Doing
3
Done


An example of where this is useful occurs in HTTP responses...

## Opera on Tour

We have explored the Magnum Opus server in a number of different ways, and seen how we can separate business logic from our application.

Our main exercise will be to work through the new code, and to adapt it for a new type of database.

## Raising the roof

First a few extra dependencies:

In [None]:
!pip3 install findspark cassandra-driver flask-cqlalchemy flask

Collecting findspark
  Obtaining dependency information for findspark from https://files.pythonhosted.org/packages/a4/cb/7d2bb508f4ca00a043fd53e8156c11767799d3f534bf451a0942211d5def/findspark-2.0.1-py2.py3-none-any.whl.metadata
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting cassandra-driver
  Obtaining dependency information for cassandra-driver from https://files.pythonhosted.org/packages/e0/2b/ea4a9c178de54f790acd2949650121490a6b98fc29bf8926ce11596eeb36/cassandra_driver-3.29.1-cp311-cp311-win_amd64.whl.metadata
  Downloading cassandra_driver-3.29.1-cp311-cp311-win_amd64.whl.metadata (6.0 kB)
Collecting flask-cqlalchemy
  Obtaining dependency information for flask-cqlalchemy from https://files.pythonhosted.org/packages/bb/e7/7675120f8b8ecfd112367b39a3799464d84916f384b46aab77333d95c934/Flask_CQLAlchemy-2.0.0-py2.py3-none-any.whl.metadata
  Downloading Flask_CQLAlchemy-2.0.0-py2.py3-none-any.whl.metadata (1.6 kB)
Collecting geomet<0.3,>=0.1 (from cass

In [None]:
!curl -L -O http://dl.bintray.com/spark-packages/maven/datastax/spark-cassandra-connector/2.4.0-s_2.11/spark-cassandra-connector-2.4.0-s_2.11.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   164  100   164    0     0    210      0 --:--:-- --:--:-- --:--:--   210

  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:03 --:--:--     0
100   146  100   146    0     0     35      0  0:00:04  0:00:04 --:--:--    57


Cassandra is a highly scalable NoSQL database - we have one conveniently set up.

In [None]:
CASSANDRA_PASSWORD = "Le2V2gZ0nk"

In [None]:
import os
import findspark

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['CQLENG_ALLOW_SCHEMA_MANAGEMENT'] = '1'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.2 --conf spark.cassandra.connection.host=cassandra-0 pyspark-shell'
findspark.init()


ValueError: Couldn't find Spark, make sure SPARK_HOME env is set or Spark is in an expected location (e.g. from homebrew installation).

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     -------------------------------------- 0.0/317.0 MB 435.7 kB/s eta 0:12:08
     -------------------------------------- 0.0/317.0 MB 388.9 kB/s eta 0:13:35
     -------------------------------------- 0.1/317.0 MB 465.5 kB/s eta 0:11:21
     -------------------------------------- 0.1/317.0 MB 465.5 kB/s eta 0:11:21
     -------------------------------------- 0.1/317.0 MB 465.5 kB/s eta 0:11:21
     -------------------------------------- 0.1/317.0 MB 465.5 kB/s eta 0:11:21
     -------------------------------------- 0.1/317.0 MB 342.4 kB/s eta 0:15:26
     -------------------------------------- 0.1/317.0 MB 342.4 kB/s eta 0:15:26
     -------------------------------------- 0.1/317.0 MB 327.1 kB/s eta 0:16:09
     -------------------------------------- 0.2/317.0 MB 360.9 kB/s e

In [None]:
import pyspark
conf = pyspark.SparkConf()

conf.set("spark.jars", "./spark-cassandra-connector-2.4.0-s_2.11.jar") # spark-cassandra-connect jar

conf.set("spark.cassandra.connection.host", "cassandra")
conf.set("spark.cassandra.auth.username", "cassandra")
conf.set("spark.cassandra.auth.password", CASSANDRA_PASSWORD)

sc = pyspark.SparkContext(conf=conf)

ModuleNotFoundError: No module named 'pyspark'

I don't generally use standard examples, as you'll see them anyway, but this is particularly nice example of how stats and parallelism can let you calculate `pi`:

In [None]:
import random

num_samples = 100000000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

NameError: name 'sc' is not defined

We can use spark to handle our data that has been inserted into Cassandra.

In [None]:
from pyspark.sql import SQLContext
sql_context = SQLContext(sc)

NameError: name 'sc' is not defined

In [None]:
table_df = sql_context.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table='substance', keyspace='pythoncourse')\
        .load()

### A few hints

Helpfully, someone created the `flask_cqlalchemy` module to make it easier (not trivial) to use SQLAlchemy style code with Cassandra.

In [None]:
from cassandra.auth import PlainTextAuthProvider

In [None]:
import uuid
from flask import Flask
from flask_cqlalchemy import CQLAlchemy

app = Flask(__name__)
app.config['CASSANDRA_HOSTS'] = ['cassandra']
app.config['CASSANDRA_KEYSPACE'] = "pythoncourse"
app.config['CASSANDRA_SETUP_KWARGS'] = {'protocol_version': 3, "auth_provider": PlainTextAuthProvider(
    username='cassandra', password=CASSANDRA_PASSWORD)}
db = CQLAlchemy(app)


class Substance(db.Model):
    id = db.columns.UUID(primary_key=True, default=uuid.uuid4)
    nature = db.columns.Text()
    state = db.columns.List(db.columns.Text())

UnresolvableContactPoints: {}

In [None]:
#db.sync_db()

In [None]:
for substance in Substance.objects().all():
    print(substance.id, substance.nature)

522b6440-bdee-4d82-b531-030edb4248a3 Sulphur
4d76c633-26bb-4ed8-be44-558440d5d7e5 Sulphur
b6d2e363-30a8-4e5e-92a3-8c7c8391b060 Sulphur
f7598b0a-1050-4a5e-96be-7785bd8b2f27 Sulphur
1f1a941b-019a-4f4f-acfd-09968f6e9d03 Sulphur
a227e5fc-6b81-4a54-a421-183a361368d1 Sulphur


What can we do with this? Looking back at our earlier work on counting distinct perhaps...

In [None]:
table_df.groupby('nature').count().toPandas()

Unnamed: 0,nature,count
0,Sulphur,6


In [None]:
from pyspark.sql.functions import approx_count_distinct, countDistinct
nature_counts = table_df.agg(approx_count_distinct(table_df.nature).alias('distinct_natures')).collect()
sql_context.createDataFrame(nature_counts).toPandas()

Unnamed: 0,distinct_natures
0,1


In [None]:
nature_counts = table_df.agg(approx_count_distinct(table_df.nature).alias('distinct_natures')).collect()
sql_context.createDataFrame(nature_counts).toPandas()

Unnamed: 0,distinct_natures
0,1
