In [116]:
import trio

async def async_double(x):
    
    # here 'f' is a coroutine. instructions say to always use await straight away for a trio functions
    f = trio.sleep(1)
    x = x * x
    print(f)
    await f
    print(f)
    return x

trio.run(async_double, 3)


<coroutine object sleep at 0x7fa1e04943c0>
<coroutine object sleep at 0x7fa1e04943c0>


9

In [117]:
# useful things in the below:
# a nursury is a context manager to store children
# all of the tasks are running concurrently inside the async with block,
# start_soon() spawns child process and returns immediately without waiting for the function to finish.
# 

import trio

async def child1():
    print("  child1: started! sleeping now...")
    await trio.sleep(1)
    print("  child1: exiting!")

async def child2():
    print("  child2: started! sleeping now...")
    await trio.sleep(1)
    print("  child2: exiting!")

async def parent():
    print("parent: started!")
    async with trio.open_nursery() as nursery:
        print("parent: spawning child1...")
        nursery.start_soon(child1)

        print("parent: spawning child2...")
        nursery.start_soon(child2)

        print("parent: waiting for children to finish...")
        # -- we exit the nursery block here --
        # at this point it waits for all coroutines to complete
    print("parent: all done!")

trio.run(parent)

parent: started!
parent: spawning child1...
parent: spawning child2...
parent: waiting for children to finish...
  child1: started! sleeping now...
  child2: started! sleeping now...
  child1: exiting!
  child2: exiting!
parent: all done!


In [118]:
# inheriting a trio class to make Tracer: a class to apply to an async func to view what's going on in detail

class Tracer(trio.abc.Instrument):
    def before_run(self):
        print("!!! run started")

    def _print_with_task(self, msg, task):
        # repr(task) is perhaps more useful than task.name in general,
        # but in context of a tutorial the extra noise is unhelpful.
        print(f"{msg}: {task.name}")

    def task_spawned(self, task):
        self._print_with_task("### new task spawned", task)

    def task_scheduled(self, task):
        self._print_with_task("### task scheduled", task)

    def before_task_step(self, task):
        self._print_with_task(">>> about to run one step of task", task)

    def after_task_step(self, task):
        self._print_with_task("<<< task step finished", task)

    def task_exited(self, task):
        self._print_with_task("### task exited", task)

    def before_io_wait(self, timeout):
        if timeout:
            print(f"### waiting for I/O for up to {timeout} seconds")
        else:
            print("### doing a quick check for I/O")
        self._sleep_time = trio.current_time()

    def after_io_wait(self, timeout):
        duration = trio.current_time() - self._sleep_time
        print(f"### finished I/O check (took {duration} seconds)")

    def after_run(self):
        print("!!! run finished")
        
        
trio.run(parent, instruments=[Tracer()])


!!! run started
### new task spawned: <init>
### task scheduled: <init>
### doing a quick check for I/O
### finished I/O check (took 0.0005784580134786665 seconds)
>>> about to run one step of task: <init>
### new task spawned: __main__.parent
### task scheduled: __main__.parent
### new task spawned: <TrioToken.run_sync_soon task>
### task scheduled: <TrioToken.run_sync_soon task>
<<< task step finished: <init>
### doing a quick check for I/O
### finished I/O check (took 0.00017316697631031275 seconds)
>>> about to run one step of task: __main__.parent
parent: started!
parent: spawning child1...
### new task spawned: __main__.child1
### task scheduled: __main__.child1
parent: spawning child2...
### new task spawned: __main__.child2
### task scheduled: __main__.child2
parent: waiting for children to finish...
<<< task step finished: __main__.parent
>>> about to run one step of task: <TrioToken.run_sync_soon task>
<<< task step finished: <TrioToken.run_sync_soon task>
### doing a quick c

In [119]:
import trio
from itertools import count

# Port is arbitrary, but:
# - must be in between 1024 and 65535
# - can't be in use by some other program on your computer
# - must match what we set in our echo client
PORT = 12345

CONNECTION_COUNTER = count()

async def echo_server(server_stream):
    # Assign each connection a unique number to make our debug prints easier
    # to understand when there are multiple simultaneous connections.
    ident = next(CONNECTION_COUNTER)
    print(f"echo_server {ident}: started")
    try:
        async for data in server_stream:
            print(f"echo_server {ident}: received data {data!r}")
            await server_stream.send_all(data)
        print(f"echo_server {ident}: connection closed")
    # FIXME: add discussion of (Base)ExceptionGroup to the tutorial, and use
    # exceptiongroup.catch() here. (Not important in this case, but important
    # if the server code uses nurseries internally.)
    except Exception as exc:
        # Unhandled exceptions will propagate into our parent and take
        # down the whole program. If the exception is KeyboardInterrupt,
        # that's what we want, but otherwise maybe not...
        print(f"echo_server {ident}: crashed: {exc!r}")


async def main():
    await trio.serve_tcp(echo_server, PORT)


# We could also just write 'trio.run(trio.serve_tcp, echo_server, PORT)', but real
# programs almost always end up doing other stuff too and then we'd have to go
# back and factor it out into a separate function anyway. So it's simplest to
# just make it a standalone function from the beginning.
trio.run(main)

KeyboardInterrupt: 

Only async functions can call other async functions


trio has zillions of async libraries for everything, eg http requests: https://www.python-httpx.org/async/


Q: The big idea behind async/await-based libraries is to [ --- do what ---- ?]
A: run lots of tasks simultaneously on a single thread by switching between them at appropriate places 

Q: can you mix async and trio primitives in a function:
A: No, they don't interact well


Q: why context manage with 'async with' not just 'with'?
A: need to be able to block at exit()/aexit() and wait for the children to finish;


Q: what is the point of calling 'await' if the function would never run without it?






In [8]:

class PrintLog:
    def __init__(self, func):
        self.func = func
    def __call__(self, *args, **kwargs):
        print('CALLING: {}'.format(self.func.__name__)) 
        return self.func(*args, **kwargs)

@PrintLog
def nf(strr):
    print(strr)
nf('hi')



CALLING: nf
hi


In [26]:

import sys
class ResultAnnouncer:
    stream = sys.stdout 
    prefix = "RESULT"
    def __init__(self, func):
        self.func = func
    def __call__(self, *args, **kwargs):
        value = self.func(*args, **kwargs)
        self.stream.write('{}: {}\n'.format(self.prefix,value))
        return value

class StdErrResultAnnouncer(ResultAnnouncer): 
    stream = sys.stderr
    prefix = "ERROR QS"
    
@StdErrResultAnnouncer
def pt():
    print('po')
pt()



po


ERROR QS: None


In [59]:
def autorepr(klass):
    def klass_repr(self):
        return '{}, {}'.format(klass.__name__ + ' instance', self.value)
    klass.__repr__ = klass_repr
    return klass

@autorepr
class Penny():
    def __init__(self, value):
        self.value = value
    def __repr__(self):
        return 'im new!'
    def printme(self):
        print(self.value, 'hahah')

pp = Penny(3)
print(pp)
pp.printme()

Penny instance, 3
3 hahah


In [105]:
import socket
import ssl

# Create a socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Wrap the socket in an SSL context
ssl_sock = ssl.wrap_socket(sock,
                           ca_certs="server.crt",
                           cert_reqs=ssl.CERT_REQUIRED)

# Connect to the server
ssl_sock.connect(("www.example.com", 443))

# Send and receive data over the SSL connection
ssl_sock.sendall(b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n")
response = ssl_sock.recv(1024)
print(response)

OSError: [Errno 57] Socket is not connected

In [100]:
list(ipaddress.ip_network('192.0.2.0/28').hosts()) 


[IPv4Address('192.0.2.1'),
 IPv4Address('192.0.2.2'),
 IPv4Address('192.0.2.3'),
 IPv4Address('192.0.2.4'),
 IPv4Address('192.0.2.5'),
 IPv4Address('192.0.2.6'),
 IPv4Address('192.0.2.7'),
 IPv4Address('192.0.2.8'),
 IPv4Address('192.0.2.9'),
 IPv4Address('192.0.2.10'),
 IPv4Address('192.0.2.11'),
 IPv4Address('192.0.2.12'),
 IPv4Address('192.0.2.13'),
 IPv4Address('192.0.2.14')]

In [106]:
def vv():
    for i in range(3):
        yield i
        
list(vv())

[0, 1, 2]

In [50]:
@Timer
def some_function2(delay):
    from time import sleep
 
    # Introducing some time delay to
    # simulate a time taking function.
    sleep(delay)
some_function2(0.4)

self.function: some_function2
Execution took 0.40506792068481445 seconds


In [39]:
import re

re.findall(r'[a-z]+|[A-Z][a-z]+', 'tee hah Oisnfif dghf')
[a[1:] for a in re.findall(r'^[a-z]+|\s[a-z]+', 'tee hah Oisnfif dghf')]



['ee', 'hah', 'dghf']

In [2]:
import timeit
timeit.repeat("8*8", repeat=2, number=1)

[1.4170000213198364e-06, 3.330000026835478e-07]

In [216]:
'hah oisnfif dghf'.split('h')

['', 'a', ' oisnfif dg', 'f']

In [None]:
## iterators
mytuple = ("apple", "banana", "cherry")
myit = iter(mytuple)
next(myit)


# __iter__ method works like __init__, but the variable is iterable
class MyNumbers:
  def __iter__(self):
    self.a = 1
    return self

  def __next__(self):
    x = self.a
    self.a += 1
    return x

myclass = MyNumbers()
myiter = iter(myclass)

print(next(myiter))
print(next(myiter))
print(next(myiter))
print(next(myiter))
print(next(myiter))


## raise StopIteration to make it loop up to a point with 'for'
class MyNumbers:
  def __iter__(self):
    self.a = 1
    return self

  def __next__(self):
    if self.a <= 20:
      x = self.a
      self.a += 1
      return x
    else:
      raise StopIteration

myclass = MyNumbers()
myiter = iter(myclass)

for x in myiter:
  print(x)

In [None]:
# creating a generator by using 'yield' as part of __iter__
class MyIterable:
    def __init__(self, data):
        self.data = data

    def __iter__(self):
        for x in self.data:
            yield x
            

my_iterable = MyIterable([1, 2, 3])
for x in my_iterable:
    print(x)

In [None]:
'aioho422'.zfill(100)

In [None]:
v = memoryview(b'abcefg')
for l in v:
    print(l)

In [None]:
## memoryview
import array
a = array.array('l', [-11111111, 22222222, -33333333, 44444444])
m = memoryview(a)
m[1]



In [None]:
# memoryview object can be hashed
v = memoryview(b'abcefg')
hash(v) == hash(b'abcefg')

In [None]:
b"abc".hex()

In [None]:
my_bytearray = bytearray(b'Hello, world!')
mv = memoryview(my_bytearray)

# Do something with the memoryview
mv[0]=101

# Release the memory associated with the memoryview
mv.release()

# view change
my_bytearray

In [None]:
hash(frozenset([1,2,3]))

In [None]:
d = {"one": 1, "two": 2, "three": 3, "four": 4}
for i in reversed(d.items()):
    print(i)
    d[i[0]] = 3
d

In [None]:
for k,v in d.items():
    d[k] = 2
d

In [None]:
type(d.keys())

In [None]:
# confirm d (a dict) is a mapping object
import collections.abc
isinstance(d, collections.abc.Mapping)

In [None]:
# Create a custom mapping object: ie a dictionary with extra methods
class MyMapping(collections.abc.Mapping):
    def __init__(self, data):
        self._data = data

    def __getitem__(self, key):
        return self._data[key]
    
    def __setitem__(self, key, newdata):
        self._data[key] = newdata
    
    def __iter__(self):
        for x in self._data:
            yield x
            
    def __len__(self):
        l = 0
        for i in self:
            l += 1
        return l
    
k = MyMapping({1:2, 3:4})
print(len(k))
print(k[3])
for v in k.items():
    print(v)
k[2] = 5
print(k[2])

In [None]:
type(Ellipsis)()

In [None]:
Ellipsis

In [298]:
### Can use Ellipsis in slicing and indexing: doesn't seem useful
my_list = [1, 2, 3, 4, 5, 6]

# Use Ellipsis to include all elements up to the fourth element
print(my_list[:Ellipsis, 4])  # [1, 2, 3, 4]

# Use Ellipsis to include all elements after the second element
print(my_list[2, Ellipsis])  # [3, 4, 5, 6]

# Use Ellipsis to include all elements
print(my_list[Ellipsis])  # [1, 2, 3, 4, 5, 6]


TypeError: list indices must be integers or slices, not tuple

In [316]:
import math
l=2
np.math.ceil(math.log(l,2))

1

In [341]:
from itertools import permutations,combinations_with_replacement,combinations
from difflib import Differ

for i,v in enumerate(combinations_with_replacement('01', 6)):
    for k,p in enumerate(permutations(v)):
        print(p)


('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0', '0', '0', '0', '0')
('0', '0

('0', '1', '0', '1', '1', '0')
('0', '1', '0', '0', '1', '1')
('0', '1', '0', '0', '1', '1')
('0', '1', '0', '1', '0', '1')
('0', '1', '0', '1', '1', '0')
('0', '1', '0', '1', '0', '1')
('0', '1', '0', '1', '1', '0')
('0', '1', '1', '0', '0', '1')
('0', '1', '1', '0', '1', '0')
('0', '1', '1', '0', '0', '1')
('0', '1', '1', '0', '1', '0')
('0', '1', '1', '1', '0', '0')
('0', '1', '1', '1', '0', '0')
('0', '1', '1', '0', '0', '1')
('0', '1', '1', '0', '1', '0')
('0', '1', '1', '0', '0', '1')
('0', '1', '1', '0', '1', '0')
('0', '1', '1', '1', '0', '0')
('0', '1', '1', '1', '0', '0')
('0', '1', '0', '0', '1', '1')
('0', '1', '0', '0', '1', '1')
('0', '1', '0', '1', '0', '1')
('0', '1', '0', '1', '1', '0')
('0', '1', '0', '1', '0', '1')
('0', '1', '0', '1', '1', '0')
('0', '1', '0', '0', '1', '1')
('0', '1', '0', '0', '1', '1')
('0', '1', '0', '1', '0', '1')
('0', '1', '0', '1', '1', '0')
('0', '1', '0', '1', '0', '1')
('0', '1', '0', '1', '1', '0')
('0', '1', '1', '0', '0', '1')
('0', '1

('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1', '1', '1', '1', '1')
('1', '1

In [354]:
import torch
import torch.nn as nn

# Define the embedding layer
embedding_layer = nn.Embedding(num_embeddings=1000, embedding_dim=128)

# Define some input text
text = ["This is some text <END>", "Here is another sentence boom!"]

# Create a vocabulary dict of word to integers
vocab = {}
vocab.update({word: index for index, word in enumerate(sorted(set([word for sentence in text for word in sentence.split()])))})

# Convert the text to a tensor
text_tensor = torch.tensor([[vocab[word] for word in sentence.split()] for sentence in text])

# Pass the text tensor through the embedding layer
embeddings = embedding_layer(text_tensor)

print(f'embeddings shape: {embeddings.shape}')
print(f'text_tensor shape: {text_tensor.shape}')
print(embeddings)

embeddings shape: torch.Size([2, 5, 128])
text_tensor shape: torch.Size([2, 5])
tensor([[[-0.1154, -1.2457,  1.6246,  ..., -1.2603, -0.6655,  0.1675],
         [ 1.3548, -0.8825,  1.5452,  ..., -2.2052,  0.5543, -0.1857],
         [-1.8039,  0.1220,  0.6829,  ..., -0.2150,  1.0618,  0.5158],
         [ 0.2836,  0.9021,  2.5265,  ..., -0.2424,  0.9529, -0.4622],
         [-1.1045,  1.9471,  1.6222,  ...,  0.7429, -0.9221, -2.5055]],

        [[ 0.6739, -1.1727,  0.2848,  ..., -3.1661, -0.9429, -0.2341],
         [ 1.3548, -0.8825,  1.5452,  ..., -2.2052,  0.5543, -0.1857],
         [ 0.8300,  1.4309, -0.0902,  ...,  0.6676,  0.4115,  0.1965],
         [ 1.3262, -0.3683, -0.7054,  ..., -1.5003, -0.7968,  1.5404],
         [-0.5412,  0.3851, -1.7952,  ...,  0.4717,  1.7988, -0.9874]]],
       grad_fn=<EmbeddingBackward0>)


In [356]:
text_tensor

tensor([[2, 5, 7, 8, 0],
        [1, 5, 3, 6, 4]])

In [355]:
import itertools as it
d = {}
for i in it.combinations([1,2,3],2):
    d[i[0]] = i[1]
print(d)
from collections import OrderedDict
OrderedDict(d)

{1: 3, 2: 3}


OrderedDict([(1, 3), (2, 3)])

In [333]:
from collections import Counter
for k,v in dict(Counter('absfgfff')).items():
    print(k,v)

a 1
b 1
s 1
f 4
g 1


In [297]:
import numpy as np
np.random.rand(5,5).tolist()

[[0.8243956563265623,
  0.6133033185320714,
  0.42662638134076614,
  0.2644858988438138,
  0.5585532569497136],
 [0.9825218659072027,
  0.10996738890410296,
  0.777418044085358,
  0.8900216679187084,
  0.35259286264707024],
 [0.34810271053019193,
  0.25466442171510106,
  0.5860492081843034,
  0.23228457055237428,
  0.6220659599290727],
 [0.2656291920390398,
  0.13041202386735395,
  0.7594183166082848,
  0.8047532965555593,
  0.6163826589770598],
 [0.8061756493066585,
  0.24204510234558207,
  0.6331985890377313,
  0.6481585718675434,
  0.7872646925780927]]

In [295]:
from multiprocessing import Process, Pipe

def f(conn, intval):
    conn.send([intval, None, 'hello'])
    conn.close()
    print(intval)

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    procs = []
    for i in range(3):
        p = Process(target=f, args=(child_conn,i,))
        procs.append(p)
        p.start()
        #print(parent_conn.recv())   # prints "[42, None, 'hello']"
    print('done')
    for p in procs:
        print(parent_conn.recv())
        p.join()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/adambricknell/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/adambricknell/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>


KeyboardInterrupt: 

In [290]:
for m in mm:
    print(m)

b'D'
b'E'
b'B'
b'U'
b'G'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'T'
b'h'
b'i'
b's'
b' '
b'm'
b'e'
b's'
b's'
b'a'
b'g'
b'e'
b' '
b's'
b'h'
b'o'
b'u'
b'l'
b'd'
b' '
b'g'
b'o'
b' '
b't'
b'o'
b' '
b't'
b'h'
b'e'
b' '
b'l'
b'o'
b'g'
b' '
b'f'
b'i'
b'l'
b'e'
b'\n'
b'I'
b'N'
b'F'
b'O'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'S'
b'o'
b' '
b's'
b'h'
b'o'
b'u'
b'l'
b'd'
b' '
b't'
b'h'
b'i'
b's'
b'\n'
b'W'
b'A'
b'R'
b'N'
b'I'
b'N'
b'G'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'A'
b'n'
b'd'
b' '
b't'
b'h'
b'i'
b's'
b','
b' '
b't'
b'o'
b'o'
b' '
b'3'
b'\n'
b'E'
b'R'
b'R'
b'O'
b'R'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'A'
b'n'
b'd'
b' '
b'n'
b'o'
b'n'
b'-'
b'A'
b'S'
b'C'
b'I'
b'I'
b' '
b's'
b't'
b'u'
b'f'
b'f'
b','
b' '
b't'
b'o'
b'o'
b','
b' '
b'l'
b'i'
b'k'
b'e'
b' '
b'\xc3'
b'\x98'
b'r'
b'e'
b's'
b'u'
b'n'
b'd'
b' '
b'a'
b'n'
b'd'
b' '
b'M'
b'a'
b'l'
b'm'
b'\xc3'
b'\xb6'
b'\n'
b'D'
b'E'
b'B'
b'U'
b'G'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'T'
b'h'
b'i'
b's'
b' '
b'm'
b'e'
b's'
b's'
b'a'
b'g'
b'e'
b' '
b's'
b'h'
b'o'
b'u'
b'l'
b'd'
b' '

b'1'
b'1'
b'1'
b'7'
b'7'
b'3'
b'6'
b'4'
b'e'
b'-'
b'0'
b'7'
b']'
b'\n'
b'D'
b'E'
b'B'
b'U'
b'G'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'T'
b'h'
b'i'
b's'
b' '
b'm'
b'e'
b's'
b's'
b'a'
b'g'
b'e'
b' '
b's'
b'h'
b'o'
b'u'
b'l'
b'd'
b' '
b'g'
b'o'
b' '
b't'
b'o'
b' '
b't'
b'h'
b'e'
b' '
b'l'
b'o'
b'g'
b' '
b'f'
b'i'
b'l'
b'e'
b'\n'
b'I'
b'N'
b'F'
b'O'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'S'
b'o'
b' '
b's'
b'h'
b'o'
b'u'
b'l'
b'd'
b' '
b't'
b'h'
b'i'
b's'
b'\n'
b'W'
b'A'
b'R'
b'N'
b'I'
b'N'
b'G'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'A'
b'n'
b'd'
b' '
b't'
b'h'
b'i'
b's'
b','
b' '
b't'
b'o'
b'o'
b' '
b'3'
b'\n'
b'E'
b'R'
b'R'
b'O'
b'R'
b':'
b'r'
b'o'
b'o'
b't'
b':'
b'A'
b'n'
b'd'
b' '
b'n'
b'o'
b'n'
b'-'
b'A'
b'S'
b'C'
b'I'
b'I'
b' '
b's'
b't'
b'u'
b'f'
b'f'
b','
b' '
b't'
b'o'
b'o'
b','
b' '
b'l'
b'i'
b'k'
b'e'
b' '
b'\xc3'
b'\x98'
b'r'
b'e'
b's'
b'u'
b'n'
b'd'
b' '
b'a'
b'n'
b'd'
b' '
b'M'
b'a'
b'l'
b'm'
b'\xc3'
b'\xb6'
b'\n'


In [289]:
f.fileno()


90

In [274]:
import itertools as it
n=4

x = [i for i in it.repeat(1,n)]
x.extend([i for i in it.repeat(2,n)])

combs = []
for i in it.permutations(x):
    for j,z in enumerate(it.accumulate(i)):
        if z == n:
            if i[:j+1] not in combs:
                combs.append(i[:j+1])
combs
    
    
    

[(1, 1, 1, 1), (1, 1, 2), (1, 2, 1), (2, 1, 1), (2, 2)]

In [268]:
x = [i for i in it.repeat(1,n)]
x.extend([i for i in it.repeat(2,n)])
x

[1, 1, 1, 1, 2, 2, 2, 2]

In [266]:
x=[i for i in it.repeat(1,n)].extend([i for i in it.repeat(2,n)])
x

In [None]:
# an async contextlib
import contextlib
import asyncio
import requests
@contextlib.asynccontextmanager
async def get_google():
    # Connect to the database
    vals = requests.get('http://www.google.com')
    try:
        # Yield the connection
        yield vals
    finally:
        # Close the connection
        print('closed')
async def main():
    # Use the context manager to get a database connection
    async with get_google() as google_txt:
        print(google_txt.text)   
# run one of the below 2 lines: need to start a new event loop if you've run loop.close()
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()

In [None]:
# this is a perfectly fine way to make a custom exception
class MyException(Exception):
    """Docstring for what the exception means"""

raise MyException({"message":"My hovercraft is full of animals", "animal":"eels"})


In [None]:
# more complicated way https://stackoverflow.com/questions/1319615/proper-way-to-declare-custom-exceptions-in-modern-python
try:
    raise MyException({"message":"My hovercraft is full of animals", "animal":"eels"})
except MyException as e:
    details = e.args[0]
    print(details["animal"])


In [None]:
### Q: why make custom exception class? 
### A: more descriptive and meaningful error message; make the error specific to your library

# how to make a custom exception class?
class MyAppValueError(ValueError):
    '''Raise when a specific subset of values in context of app is wrong'''
    def __init__(self, message, foo, *args):
        self.message = message # without this you may get DeprecationWarning
        # Special attribute you desire with your Error, 
        # perhaps the value that caused the error?:
        self.foo = foo         
        # allow users initialize misc. arguments as any other builtin Error
        super(MyAppValueError, self).__init__(message, foo, *args) 

raise MyAppValueError('msg for user', 'something else', 'wasteman')


In [None]:
class NetworkError(Exception):
    def __init__(self, message, errors):
        super().__init__(message)
        self.errors = errors
raise NetworkError('A network error occurred', ['Error 1', 'Error 2'])

In [None]:
# can catch multiple types of error and get info on the error
try:
    int('asfg'+'aa'+'3f2w4g')
except (RuntimeError, TypeError, NameError, ValueError) as e:
    print(type(e))
    print(e.args)
    print('value was asfg')
    print(e) 

In [None]:
# try/except can catch errors which propagate from called functions
def this_fails():
    x = 1/0
try:
    this_fails()
except ZeroDivisionError as err:
    print('Handling run-time error:', err)

finally is executed regardless of whether the statements in the try block fail or succeed. else is executed only if the statements in the try block don't raise an exception.

https://stackoverflow.com/questions/6051934/purpose-of-else-and-finally-in-exception-handling

In [None]:
# if open() in try fails, then exception is run, then runtime returns to rest of try statement: this is 
# true for OSError but not KeyError, which if called *does* skip the else section
for arg in sys.argv[1:]:
    f = None
    d = {4:3}
    try:
        f = open(arg, 'r')
        print('still running after OSError is raised!' + d[3])
        print('a')
    except OSError:
        print('cannot open', arg)
    except KeyError as e:
        print(e)
    else:
        print('hahaha')   
        print(arg, 'has', len(f.readlines()), 'lines')
    finally:
        #print(arg, 'has', len(f.readlines()), 'lines')
        print('ho')
        if f:
            f.close()


In [None]:
# getting info on specific error raised. Also can store 'context' for later
import traceback
d={4:3}
try:
    d[3]
except Exception as e:
    context = {
        'error_type': e.__class__.__name__,
        'error_message': str(e),
        'error_traceback': traceback.format_exc(),
        'error_keys':dir(e),
        'args':e.args
    }
    raise Exception(context)


In [None]:
# raise exception with tracebacks
# More tracebacks here: https://docs.python.org/3/library/traceback.html
import traceback
d={4:3}
try:
    d[3]
except Exception as e:
    context = {
        'error_type': e.__class__.__name__,
        'error_message': str(e),
        'error_traceback': traceback.format_exc(),
        'exception':traceback.print_last(),
        'error_traceback_stack': traceback.extract_stack(),
        #'error_traceback': traceback.format_list(),
        #'error_traceback': traceback.walk_stack(),
        'error_keys':dir(e),
        'args':e.args
    }
    
    # could store traceback here
    
    raise Exception(context)


In [None]:
context['exception']

In [None]:
context['error_traceback_stack']

In [None]:
## write to sqlite db
import sqlite3

# Connect to the database
conn = sqlite3.connect('mydatabase.db')

# Create a cursor object
cursor = conn.cursor()

value1='a'
value2='b'
table_name='haha'

# Execute a CREATE TABLE statement: do this if table doesn't exist yet
#cursor.execute("CREATE TABLE table_name (column1 datatype, column2 datatype)")

# Execute an INSERT statement
cursor.execute("INSERT INTO table_name (column1, column2) VALUES (?, ?)", (value1, value2))

# Commit the changes to the database
conn.commit()

# view all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
print(cursor.fetchall())

# Close the connection
conn.close()



In [None]:
# warnings: can raise generic warning or a category of warning
import warnings

def function_with_warning():
    warnings.warn("This is a warning message.")
function_with_warning()

def function_with_warning():
    warnings.warn("This is a warning message.", category=UserWarning)
function_with_warning()


In [None]:
# can make a custom class of warning
import warnings

class CustomWarning(Warning):
    print('this warning is all kinds of bad')

def function_with_warning():
    warnings.warn("This is a custom warning message.", category=CustomWarning)

function_with_warning()


In [None]:
# raising custom exception message
class CustomException(Exception):
    def __init__(self, message):
        super().__init__(message)
        self.message = message

def function_that_raises_exception():
    raise CustomException("This is a custom exception message.")

try:
    function_that_raises_exception()
except CustomException as e:
    print(e)


In [None]:
def f():
    excs = [OSError('error 1'), SystemError('error 2')]
    raise ExceptionGroup('there were problems', excs)
f()

In [None]:
# SequenceMatcher class shows how similar two strings are;
# Differ class shows how much they differ

import difflib  
from difflib import SequenceMatcher  
# defining the strings  
str_1 = "Welcome to Javatpoint"  
str_2 = "Welcome to Python tutorial"  
  
# using the SequenceMatcher() function  
my_seq = SequenceMatcher(a = str_1, b = str_2)  
  
# printing the result  
print("First String:", str_1)  
print("Second String:", str_2)  
print("Sequence Matched:", my_seq.ratio())    # the ratio of identical characters in the two strings,

In [None]:

import difflib  
from difflib import Differ  
  
# defining the strings  
str_1 = "They would like to order a soft drink"  
str_2 = "They would like to order a corn pizza"  
  
# using the splitlines() function  
lines_str1 = str_1.splitlines()  
lines_str2 = str_2.splitlines()  
  
# using the Differ() and compare() function  
dif = difflib.Differ()  
my_diff = dif.compare(lines_str1, lines_str2)  
  
# printing the results  
print("First String:", str_1)  
print("Second String:", str_2)  
print("Difference between the Strings:")  
print('\n'.join(my_diff)) 

In [None]:
'\n'.join(dif.compare(lines_str1, lines_str2))

In [None]:
# get words which are similar to given string
# uses a cutoff of % of letters which are in both strings

import difflib  
from difflib import get_close_matches  
  
# using the get_close_matches method  
my_list = get_close_matches('mas', ['master', 'mask', 'duck', 'cow', 'mass', 'massive', 'python', 'butter'])  
  
# printing the list  
print("Matching words:", my_list)  

In [None]:
# unified_diff to see what would have to be dropped and added to go from first string to second
import sys  
import difflib  
from difflib import unified_diff  
  
# defining the string variables  
str_1 = ['Mark\n', 'Henry\n', 'Richard\n', 'Stella\n', 'Robin\n', 'Employees\n']  
str_2 = ['Arthur\n', 'Joseph\n', 'Stacey\n', 'Harry\n', 'Emma\n', 'Employees\n']  
  
# using the unified_diff() function  
sys.stdout.writelines(unified_diff(str_1, str_2))  

In [None]:
import sys  
import difflib  
from difflib import context_diff  
  
# defining the string variables  
str_1 = ['Mark\n', 'Henry\n', 'Richard\n', 'Stella\n', 'Robin\n', 'Employees\n']  
str_2 = ['Arthur\n', 'Joseph\n', 'Stacey\n', 'Harry\n', 'Emma\n', 'Employees\n']  
  
# using the context_diff() function  
sys.stdout.writelines(context_diff(str_1, str_2))  

In [None]:
# use file.readline to read one line at a time, file.readlines() to read all
# file.write() to append one line at a time
import readline

for char in 'abcdef':
    with open("filename.txt", "a") as file:
        file.write(f'{char}\n')
    
# Open the file in read-only mode
with open("filename.txt", "r") as file:
    # Read a single line of text from the file
    line = file.readline()
    print(line)
    line = file.readline()
    print(line)

In [None]:
import unicodedata
unicodedata.name('B')
unicodedata.name('{')


In [None]:
import textwrap
textwrap.shorten("Hello world", width=10, placeholder="...")

In [None]:
s = '''\
    hello
      world
    '''
print(textwrap.dedent(s))

In [None]:
## giving rules to a TextWrapper class then applying it
wrapper = textwrap.TextWrapper()
wrapper.initial_indent = "*hah"
wrapper.fill(s)

In [None]:
from zoneinfo import ZoneInfo
from datetime import datetime, timedelta

dt = datetime(2020, 10, 31, 12, tzinfo=ZoneInfo("America/Los_Angeles"))
print(dt)

dt.tzname()


In [None]:
import zoneinfo 
len(zoneinfo.available_timezones())

In [None]:
zoneinfo.TZPATH

In [None]:
import weakref

class MyClass:
    def __init__(self, value):
        self.value = value
        self.current_btcs = 0
    
    def multi(self):
        self.value = self.value * 2
        
    def minus(self):
        self.value -= 5
    
    def buy_btc(self, amount, price):
        self.current_btcs += amount
        self.value -= amount * price
        

obj = MyClass(10)
weak_ref = weakref.ref(obj)

obj = weak_ref()
print(obj.value)  # prints 10
print(obj.multi())  # prints 10
print(obj.value)  # prints 10


In [None]:
aaa = 10

In [None]:
thing = MyClass(10)
thing.buy_btc(2, 3)

In [None]:
print(thing.value)
print(thing.current_btcs)

In [None]:
from enum import Enum
Color = Enum('Color', ['RED', 'GREEN', 'BLUE'])
for c in Color:
    print(c.name)
    print(c.value)
list(Color)

In [None]:
keywords = Enum('Keys', {'delta':'red', 'alpha':'blue'})
for c in keywords:
    print(c.name)
    print(c.value)


In [None]:
# topological sort only works if it's a directed acyclic grap
from graphlib import TopologicalSorter
graph = {"D": {"B", "C"}, "C": {"A"}, "B": {"A"}}
ts = TopologicalSorter(graph)
tuple(ts.static_order())


In [None]:
### Can assign a function to a new types class as a method
import types

# Define a new type called 'MyType'
MyType = types.new_class('MyType')

# Define a method for the type
def greet(self):
    print(f'Hello, my name is {self.name}')

# Set the method as an attribute of the type
MyType.greet = greet

# Create an instance of the type
obj = MyType()
obj.name = 'Alice'

# Call the method on the instance
obj.greet()  # prints "Hello, my name is Alice"


In [None]:
from types import GenericAlias

list[int] == GenericAlias(list, (int,))

dict[str, int] == GenericAlias(dict, (str, int))

In [None]:
import types
IntList = types.GenericAlias('IntList', list[int])

def test_function(x: IntList):
    """Add 1 to all values in list of ints"""
    if not isinstance(x[0], int):
        raise TypeError('Input must be an integer')
    return [c+1 for c in x]

test_function([1,2,3])


In [None]:
from collections.abc import Sequence
Sequence

In [None]:
from dataclasses import dataclass

@dataclass
class InventoryItem:
    """Class for keeping track of an item in inventory."""
    name: str
    unit_price: float
    quantity_on_hand: int = 0

    def total_cost(self) -> float:
        return self.unit_price * self.quantity_on_hand
    
InventoryItem('adam', 15, 2).total_cost()

In [None]:
import itertools
for i in itertools.repeat(10, 3):
    print(i)

In [None]:
for i in itertools.accumulate([1,2,3,4,5]):
    print(i)

In [None]:
for i in itertools.chain('ABC', 'DEF','aaa'):
    print(i)

In [None]:
# filter where is a 1 not 0 in corresponding list
for i in itertools.compress('ABCDEF', [1,0,1,0,1,1]):
    print(i)


In [None]:
for p in itertools.permutations('ABCD', 2):
    print(p)

In [None]:
# all combinations in one order
for p in itertools.combinations('ABCD', 2):
    print(p)

In [None]:
# same as memoise
from functools import cache
@cache
def factorial(n):
    return n * factorial(n-1) if n else 1
print(factorial(2))
print(factorial.cache_info())
print(factorial.cache_parameters())
print(factorial.cache_clear())
print(factorial.cache_info())


In [None]:
# the @property decorator turns the voltage() method into a “getter” for a read-only attribute with the same name, 
class Parrot:
    def __init__(self):
        self._voltage = 100000

    @property
    def voltage(self):
        """Get the current voltage."""
        return self._voltage * 2
    
p = Parrot()
p.voltage

In [None]:
class C:
    def __init__(self):
        self._x = 'ha'

    @property
    def x(self):
        """I'm the 'x' property."""
        return self._x

    # allows setting value of Cinstance.x
    @x.setter
    def x(self, value):
        self._x = value
   
    # allows deleting value of Cinstance.x
    @x.deleter
    def x(self):
        del self._x
    
c = C()
del c.x
c.x = 4
c.x


In [None]:
# lru_cache = memoizing callable that saves up to the maxsize most recent calls
from functools import lru_cache 
@lru_cache
def count_vowels(sentence):
    return sum(sentence.count(vowel) for vowel in 'AEIOUaeiou')
count_vowels('hahahan os')

In [None]:
import functools
@functools.total_ordering
class Student:
    def _is_valid_operand(self, other):
        return (hasattr(other, "lastname") and
                hasattr(other, "firstname"))
    def __eq__(self, other):
        if not self._is_valid_operand(other):
            return NotImplemented
        return ((self.lastname.lower(), self.firstname.lower()) ==
                (other.lastname.lower(), other.firstname.lower()))
    def __lt__(self, other):
        if not self._is_valid_operand(other):
            return NotImplemented
        return ((self.lastname.lower(), self.firstname.lower()) <
                (other.lastname.lower(), other.firstname.lower()))
s = Student()
s.lastname = 'aa'
s.firstname = 'bb'
print(s < s)
print(s >= s)

In [None]:
# reduce() takes a start value x (0 in this case) does something with each y value, accumulating 
from functools import reduce
reduce(lambda x, y: x+y, [1, 2, 3, 4, 5], 0)

In [None]:
## @singledispatch makes one fun(), and depending on the input type a different version of the below will be 
## called. Basically: overloading the function
from functools import singledispatch
@singledispatch
def fun(arg, verbose=False):
    if verbose:
        print("Let me just say,", end=" ")
    print(arg)
    
@fun.register
def _(arg: int | float, verbose=False):
    if verbose:
        print("Strength in numbers, eh?", end=" ")
    print(arg)

from typing import Union
@fun.register
def _(arg: Union[list, set], verbose=False):
    if verbose:
        print("Enumerate this:")
    for i, elem in enumerate(arg):
        print(i, elem)

In [None]:
import functools

def log_function(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        print(f"Called {func.__name__} with arguments {args} and keyword arguments {kwargs}")
        print(f"{func.__name__} returned {result}")
        return result
    return wrapper

@log_function
def add(x, y):
    return x + y

print(add(1, 2))
# Output:
# Called add with arguments (1, 2) and keyword arguments {}
# add returned 3


In [None]:
from pathlib import Path
p = Path('.')
print([x for x in p.iterdir() if x.is_dir()])
print([x for x in p.iterdir()])

# get .md files in this directory tree
list(p.glob('**/*.md'))

path = Path('/tmp')
for item in path.iterdir():
    print(item)

path = Path('/tmp/example.txt')
path.write_text('Hello, world!\nhaha')
    
path = Path('/tmp/example.txt')
with path.open() as f:
    print(f.readline())
    print(f.readline())
    
path = Path('/tmp/example.txt')
if path.exists():
    print('The file exists')
else:
    print('The file does not exist')

contents = path.read_text()
print(f'contents: {contents}')


In [None]:
import fileinput
with fileinput.input(files=['filename.txt', 'filename.txt']) as f:
    for line in f:
        print(line)

In [None]:
path = Path()
for item in path.iterdir():
    print(item)

In [None]:
import filecmp
# returns match, mismatch, errors for specified files (3rd param) between 2 directories (params 1 and 2)
filecmp.cmpfiles('', '', ['filename.txt'])

In [None]:
filecmp.cmp('filename.txt', 'filename.txt')

In [None]:
import tempfile

with tempfile.TemporaryFile() as fp:
    fp.write(b'Hello world!')
    fp.seek(0)
    fp.read()

with tempfile.TemporaryDirectory() as tmpdirname:
    print('created temporary directory', tmpdirname)


In [None]:
import linecache
linecache.getline(next(p.glob('**/*.md')).name, 4)

In [None]:
dir(next(p.glob('**/*.md')))

In [None]:
next(p.glob('**/*.md')).name

In [None]:
import pickle
pickle.dumps('sometext', protocol=pickle.HIGHEST_PROTOCOL)


In [None]:
# use pickletools.optimize. Is:
# shorter, takes less transmission time, requires less storage space, and unpickles more efficiently.
import pickletools
more_efficient_pickle_bytestring = pickletools.optimize(pickle.dumps('sometext', protocol=pickle.HIGHEST_PROTOCOL))
pickle.loads(more_efficient_pickle_bytestring)

In [None]:
import shelve
with shelve.open('spam') as db:
    print(db.get('eggs'))
    db['eggs2'] = 'eggs_and_bacon'
    print(db.get('eggs2'))

In [None]:
import shelve
with shelve.open('spam', writeback=True) as db:
    print(db.get('eggs'))
    db.sync()        					 ### adding this line to manually sync db
    db['eggs2'] = 'eggs_and_bacon'
    print(db.get('eggs2'))

In [None]:
import dbm

# Open database, creating it if necessary.
with dbm.open('cache', 'c') as db:

    # Record some values
    db[b'hello'] = b'there'
    db['www.python.org'] = 'Python Website'
    db['www.cnn.com'] = 'Cable News Network'

    # Note that the keys are considered bytes now.
    assert db[b'www.python.org'] == b'Python Website'
    # Notice how the value is now in bytes.
    assert db['www.cnn.com'] == b'Cable News Network'

    # Often-used methods of the dict interface work too.
    print(db.get('python.org', b'not present'))
    
    # Storing a non-string key or value will raise an exception (most
    # likely a TypeError).
    db['www.yahoo.com'] = 'a'

In [None]:
os.listdir()

In [None]:
## sqlite
import sqlite3
con = sqlite3.connect("tutorial.db")
cur = con.cursor()
cur.execute("CREATE TABLE movie(title, year, score)")
res = cur.execute("SELECT name FROM sqlite_master")
print(res.fetchone())
res = cur.execute("SELECT name FROM sqlite_master WHERE name='spam'")
print(res.fetchone() is None)
cur.execute("""
    INSERT INTO movie VALUES
        ('Monty Python and the Holy Grail', 1975, 8.2),
        ('And Now for Something Completely Different', 1971, 7.5)
""")
con.commit()
data = [
    ("Monty Python Live at the Hollywood Bowl", 1982, 7.9),
    ("Monty Python's The Meaning of Life", 1983, 7.5),
    ("Monty Python's Life of Brian", 1979, 8.0),
]
cur.executemany("INSERT INTO movie VALUES(?, ?, ?)", data)
con.commit() 
for row in cur.execute("SELECT year, title FROM movie ORDER BY year"):
    print(row)

In [None]:
# making a python function and calling it in sqlite with con.create_functionw
import hashlib
def md5sum(t):
    return hashlib.md5(t).hexdigest()
con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum)
for row in con.execute("SELECT md5(?)", (b"foo",)):
    print(row)

In [None]:
# using context manager
with sqlite3.connect("tutorial.db") as con:
    print(con.execute("select * from movie").fetchall())

In [None]:
import zlib, gzip, lzma, bz2
print(zlib.compress(b'aa'))
print(gzip.compress(b'aa'))
print(lzma.compress(b'aa'))
print(bz2.compress(b'aa'))


print(zlib.decompress(zlib.compress(b'aa')))
print(bz2.decompress(bz2.compress(b'aa')))

In [None]:
import configparser
config = configparser.ConfigParser()
config['one.thing'] = {'ServerAliveInterval': '45',
                     'Compression': 'yes',
                     'CompressionLevel': '9'}
config['bitbucket.org'] = {}
config['bitbucket.org']['User'] = 'hg'
config['topsecret.server.com'] = {}
topsecret = config['topsecret.server.com']
topsecret['Port'] = '50022'     # mutates the parser
topsecret['ForwardX11'] = 'no'  # same here
config['DEFAULT']['ForwardX11'] = 'yes'
with open('example.ini', 'w') as configfile:
  config.write(configfile)

In [None]:
# read in ini file and convert to dict
config = configparser.ConfigParser()
config.read('example.ini')
config_dict = {s:dict(config.items(s)) for s in config.sections()}
config_dict

In [None]:
config_dict['one.thing']

In [None]:
# hashlib
from hashlib import sha256, blake2b, md5
s = sha256()
s.update(b"Nobody inspects")
print(s.digest())


# or the same
print(sha256(b"Nobody inspects").digest())
print(sha256(b"Nobody inspects").hexdigest())
print(blake2b(b"Nobody inspects").hexdigest())
print(md5(b"Nobody inspects").hexdigest())

print(hashlib.algorithms_available)

In [None]:
import hmac
hmac.digest(b'key1',msg=b'haha',digest='md5')


In [None]:
import secrets
# make random bytestring of length 16
secrets.token_bytes(16) 
secrets.token_hex(16)


# generate 8 letter alphanumeric password
import string
import secrets
alphabet = string.ascii_letters + string.digits
password = ''.join(secrets.choice(alphabet) for i in range(8))
print(password)

# Generate a hard-to-guess temporary URL containing a security token suitable for password recovery applications:
import secrets
url = 'https://example.com/reset=' + secrets.token_urlsafe()
url


# create random set of words
import secrets
# On standard Linux systems, use a convenient dictionary file.
# Other platforms may need to provide their own word-list.
with open('/usr/share/dict/words') as f:
    words = [word.strip() for word in f]
    password = ' '.join(secrets.choice(words) for i in range(4))

In [None]:
import os
os.stat(path)
os.name
os.fsencode('haha')

# get env variables
os.environ           

# get name of user
os.getlogin()

# get pid of this process
os.getpgrp()

# create new empty file
open('newfile.txt', 'w')



In [None]:
os.path.expanduser('~/Desktop/pythonlearning2023')

In [None]:
import io
f = io.StringIO("some initial text data")
for i in f:
    print(i)
    
f = io.BytesIO(b"some initial binary data: \x00\x01")
for i in f:
    print(i)

In [None]:
# reads in raw file: docs say its rare you'll do this
f = open("myfile.jpg", "rb", buffering=0)

In [None]:
from _io import (DEFAULT_BUFFER_SIZE, BlockingIOError, UnsupportedOperation,
                 open, open_code, FileIO, BytesIO, StringIO, BufferedReader,
                 BufferedWriter, BufferedRWPair, BufferedRandom,
                 IncrementalNewlineDecoder, text_encoding, TextIOWrapper)


In [None]:
import logging
logging.warning('Watch out!')

In [None]:
# this needs to run in python terminal: won't work in jupyter
import logging
import os
logfilepath=filename=os.getcwd()+'/example.log'
logging.basicConfig(filename=logfilepath, encoding='utf-8', level=logging.DEBUG)
logging.debug('This message should go to the log file')
logging.info('So should this')
haha = 3
logging.warning(f'And this, too {haha}')
logging.error('And non-ASCII stuff, too, like Øresund and Malmö')
print(logfilepath)

In [None]:
logging.getLogger(__name__)

In [None]:
import getpass
a = getpass.getpass()
print(a)

In [None]:
import curses
screen = curses.initscr()
# Update the buffer, adding text at different locations
screen.addstr(0, 0, "This string gets printed at position (0, 0)")
screen.addstr(3, 1, "Try Russian text: Привет")  # Python 3 required for unicode
screen.addstr(4, 4, "X")
screen.addch(5, 5, "Y")
# Changes go in to the screen buffer and only get
# displayed after calling `refresh()` to update
screen.refresh()
curses.napms(3000)
curses.endwin()

In [None]:
import platform
platform.architecture()
platform.machine()
platform.node()
platform.platform()
platform.processor()
platform.python_build()
platform.python_compiler()
platform.python_version()
platform.system()
platform.uname()

In [None]:
import threading

# count active threads
threading.active_count()

In [None]:
# list active threads
for i in threading.enumerate():
    print(i)

In [None]:
threading.main_thread()

In [None]:
import threading
from numba.typed import Dict
import numpy as np

# Create a global list
my_list = [1, 2, 3, 4, 5]

# Define a function that will be run by the thread
def update_list(n):
    global my_list
    my_list = [i*n for i in my_list]

# Create a thread for each operation
threads = []
for i in range(5):
    t = threading.Thread(target=update_list, args=(i+1,))
    threads.append(t)

print(threads)

# Start the threads
for t in threads:
    t.start()

# Wait for the threads to finish
for t in threads:
    t.join()

# Print the final state of the list
print(my_list)

In [None]:
## writing to typed dict in parallel. 
# Could read distributed GCS files to append as memory is shared
import numba
value_float = numba.float64

typed_dict = Dict.empty(
    key_type=value_float,
    value_type=value_float
    )

def update_dict(n):
    global typed_dict
    typed_dict[n] = n + 1

# Create a thread for each operation
threads = []
for i in range(8):
    t = threading.Thread(target=update_dict, args=(i+1,))
    threads.append(t)

# Start the threads
for t in threads:
    t.start()

# Wait for the threads to finish
for t in threads:
    t.join()

print(typed_dict)


In [None]:
from multiprocessing import Process
def print_func(continent='Asia'):
    print('The name of continent is : ', continent)
if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()
    # instantiating process with arguments
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()
    print(proc)
    # complete the processes
    for proc in procs:
        proc.join()

In [None]:
# target function
from multiprocessing import Semaphore, Process
def task(semaphore, number):
    # attempt to acquire the semaphore
    with semaphore:
        # simulate computational effort
        value = random()
        sleep(value)
        # report result
        print(f'Process {number} got {value}')
semaphore = Semaphore(2)
processes = [Process(target=task, args=(semaphore, i)) for i in range(10)]
# start child processes
for process in processes:
    process.start()
# wait for child processes to finish
for process in processes:
    process.join()


In [None]:
import ctypes
rows = 3
cols = 4
arr = [[0] * cols for _ in range(rows)]

# Create a pointer type for the elements of the array
IntArrayType = ctypes.c_int * cols

# Create an array of pointers to the elements of the array
pointers = (IntArrayType * rows)()

# Initialize the pointers and the elements of the array
for i in range(rows):
    pointers[i] = IntArrayType(*arr[i])

print(pointers[1][2])


In [None]:
current_thread = threading.current_thread()
#thread_id = current_thread.ident

In [None]:
from multiprocessing.shared_memory import ShareableList
a = ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
a[2] = -78.5
len(a)
a.shm


In [None]:
import asyncio
import requests
async def do_thing(i):
    print('hi')
    res = requests.get('https://www.google.com').text
    return i

tasks = []
for i in range(100):  
    task = asyncio.create_task(do_thing(i))
    tasks.append(task)

for coro in asyncio.as_completed(tasks):
    earliest_result = await coro 
    print(earliest_result)   # this only prints when above line is done for all 100. 
                            # earliest_result is an int, but all 100 vals are printed (it may be different for each 
                            # coroutine)

In [None]:
asyncio.get_running_loop()

In [None]:
##### Searching big logs. 

# on recommmendation is grep and awk, as they are v fast. Or mmap as below
# https://stackoverflow.com/questions/66071560/searching-through-a-large-text-or-log-file-10gb
import logging
import os
import mmap
import re
logfilepath=filename=os.getcwd()+'/example.log'
logging.basicConfig(filename=logfilepath, encoding='utf-8', level=logging.DEBUG)
logging.debug('This message should go to the log file')
logging.info('So should this')
haha = 3
logging.warning(f'And this, too {haha}')
logging.error('And non-ASCII stuff, too, like Øresund and Malmö')
print(f'written to {logfilepath}')

f = open(logfilepath, "r")
mm = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)
search_res = re.search(b"message", mm)
print([search_res.start(), search_res.end()])

# findall returns list of matched patterns by default
findall_res = re.findall(b"message", mm)
findall_res


In [None]:
with open(logfilepath, "r") as f:
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    print(mm.rfind(b'ERR'))

In [None]:
# Get whole line of log featuring 'ASCII'
import re
import mmap
import os
logfilepath=filename=os.getcwd()+'/example.log'

with open(logfilepath, "r") as f:
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    # Search for the pattern "error" in the file
    start = mm.find(b"ASCII")
    end = mm.find(b"\n", start)
    
    # find first '\n' prior to 'start'
    line_start = mm.rfind(b'\n', 0, start)
    
    error_lines = []
    
    while start != -1 and end != -1:
        
        # Extract the line containing the pattern and decode it
        line = mm[line_start:end].decode()
        
        # Append the line to the list of error lines
        error_lines.append(line)
        
        # Search for the next occurrence of the pattern
        start = mm.find(b"ASCII", end)
        previous_instance_end = end
        end = mm.find(b"\n", start)
        line_start = mm.rfind(b'\n', previous_instance_end, start)
        
    # Close the memory-mapped file
    mm.close()

# Print the list of error lines
print(error_lines)

In [None]:
with open(logfilepath, "r+b") as f:
    with mmap.mmap(f.fileno(), 0) as mm:
        myline = mm.readline()
        while myline:
            print(myline)
            myline = mm.readline()
        mm.close()   


In [None]:
myline = myfile.readline()
while myline:
    print(myline)
    myline = myfile.readline()
myfile.close()   

In [None]:
import unittest

class DefaultWidgetSizeTestCase(unittest.TestCase):
    def test_default_widget_size(self):
        widget = Widget('The widget')
        self.assertEqual(widget.size(), (50, 50))

In [None]:
import unittest
import numpy as np

def Widget():
    ar = np.random.rand(50,50)
    print(ar.shape)
    return ar

Widget()

In [None]:
class NumbersTest(unittest.TestCase):
    def test_even(self):
        """
        Test that numbers between 0 and 5 are all even.
        """
        for i in range(0, 6):
            with self.subTest(i=i):
                self.assertEqual(i % 2, 0)
NumbersTest().test_even()

In [None]:
def greeting(name: str) -> str:
    return 'Hello ' + name
greeting('adam')

In [None]:
Vector = list[float]

def scale(scalar: float, vector: Vector) -> Vector:
    """
    scale = input float value
    vector = the thing we multiply by
    """
    return [scalar * num for num in vector]

# passes type checking; a list of floats qualifies as a Vector.
new_vector = scale(2.0, [1.0, -4.2, 5.4])
new_vector

In [None]:
from typing import NewType

# Create a new type called "UserId"
UserId = NewType('UserId', int)

# Define a function that takes a UserId as an argument
def get_user_name(user_id: UserId) -> str:
    # Implementation goes here
    pass

# Create a value of type UserId
user_id = UserId(123)

# Call the function with the UserId value
user_name = get_user_name(user_id)


In [None]:
import collections

# Create a vocabulary by mapping words to indices
vocab = {}
vocab.update({word: index for index, word in enumerate(sorted(set([word for sentence in text for word in sentence.split()])))})




In [None]:
vocab

In [None]:
import torch
import torch.nn as nn

# Define the embedding layer
embedding_layer = nn.Embedding(num_embeddings=1000, embedding_dim=128)

# Define some input text
text = ["This is some text <END>", "Here is another sentence boom!"]

# Create a vocabulary by mapping words to indices
vocab = {}
vocab.update({word: index for index, word in enumerate(sorted(set([word for sentence in text for word in sentence.split()])))})

# Convert the text to a tensor
text_tensor = torch.tensor([[vocab[word] for word in sentence.split()] for sentence in text])

# Pass the text tensor through the embedding layer
embeddings = embedding_layer(text_tensor)

print(f'embeddings shape: {embeddings.shape}')
print(f'text_tensor shape: {text_tensor.shape}')
print(embeddings)


In [None]:
text_tensor

In [None]:
import logging
import os
import timeit
logfilepath=filename=os.getcwd()+'/example.log'
logging.basicConfig(filename=logfilepath, encoding='utf-8', level=logging.DEBUG)
timed = timeit.repeat("8*8", repeat=2, number=1)   # number = number of times func will be called
logging.info(f'So should this time: {timed}')


In [None]:
timed

In [None]:
def gen_nums(): 
    n=0
    while n < 4: 
        yield n
        n += 1

for i in gen_nums():
    print(i)

In [None]:
import unittest

class Angle():
    def __init__(self, angle):
        self.degrees = angle
        
    def __repr__(self):
        return str(self.degrees) + ' degrees'

Angle(4)

In [None]:
class Prefixer:
    def __init__(self, prefix):
        self.prefix = prefix 
    def __call__(self, message):
        return self.prefix + message

Prefixer('main')(' message added')

In [None]:
# making a decorator from a class
class PrintLog:
    def __init__(self, func):
        self.func = func
    def __call__(self, *args, **kwargs):
        print('CALLING: {}'.format(self.func.__name__)) 
        return self.func(*args, **kwargs)

@PrintLog
def nf(strr):
    print(strr)
nf('hi')

In [None]:
## our own decorator classes can inherit from other decorator classes
import sys
class ResultAnnouncer:
    stream = sys.stdout 
    prefix = "RESULT"
    def __init__(self, func):
        self.func = func
    def __call__(self, *args, **kwargs):
        value = self.func(*args, **kwargs)
        self.stream.write('{}: {}\n'.format(self.prefix,value))
        return value

class StdErrResultAnnouncer(ResultAnnouncer): 
    stream = sys.stderr
    prefix = "ERROR QS"
    
@StdErrResultAnnouncer
def pt():
    print('po')
pt()

In [None]:
class CountCalls:
    def __init__(self, func):
        self.func = func
        self.count = 1
    def __call__(self, *args, **kwargs):
        print('# of calls: {}'.format(self.count)) 
        self.count += 1
        return self.func(*args, **kwargs)

caller = CountCalls(print)
for i in range(3):
    caller('aaa')

In [None]:
# here 'klass' is the name of the class being decorated
def autorepr(klass):
    def klass_repr(self):
        return '{}, {}'.format(klass.__name__ + ' instance', self.value)
    klass.__repr__ = klass_repr
    return klass

@autorepr
class Penny():
    def __init__(self, value):
        self.value = value

print(Penny(3))

In [None]:
class Person:
    def __init__(self, firstname, lastname):
        self.firstname = firstname
        self.lastname = lastname

    #@property
    def fullname(self):
        return self.firstname + " " + self.lastname
    
Person('john', 'smith').fullname()

In [None]:
class Ticket:
    def __init__(self, _price):
        self._price = _price
        
    def __set__(self, price):
        self._price = _price
    @property
    def price(self):
        return self._price 
    @price.setter
    def price(self, new_price):
    # Only allow positive prices. 
        print(f'new_price: {new_price}')
        if new_price < 0:
            raise ValueError("Nice try") 
        self._price = new_price
        
t = Ticket(10)
t.price(30)

In [None]:
from dataclasses import dataclass
@dataclass
class NeoXArgsDeepspeedConfig():
    deepspeed: bool = True
    train_batch_size: int = None
        
conf = NeoXArgsDeepspeedConfig()
conf.deepspeed

In [None]:
from typing import List

class Solution:
    def pivotIndex(self, nums: List[int]) -> int:
        
        # add these so they can be dropped later
        nums.insert(0,0)
        nums.append(0)
                
        left_sum = nums[0]
        right_sum = nums[-1]
        left_pos = 0
        left_numbers_used = 1
        right_pos = len(nums) - 1
        right_numbers_used = 1

        while (left_numbers_used + right_numbers_used) < len(nums) - 1:
            if left_sum < right_sum:
                left_pos += 1
                left_sum += nums[left_pos]
                left_numbers_used += 1
            if left_sum > right_sum:
                right_pos -= 1
                right_sum += nums[right_pos]
                right_numbers_used += 1
                

        if left_sum == right_sum:
            return left_pos + 1
        else:
            return -1
            
Solution().pivotIndex([2,-1,1])

In [None]:
class Solution(object):
    def pivotIndex(self, nums):
        S = sum(nums)
        leftsum = 0
        for i, x in enumerate(nums):
            if leftsum == (S - leftsum - x):
                return i
            leftsum += x
        return -1
    
Solution().pivotIndex([2,-1,1])

In [None]:
nums = [2,-1,1]
for i in enumerate(nums):
    print(i)

In [None]:
class ListNode:
    def __init__(self, val=0, next=None):
        self.val = val
        self.next = next
        
import math
from typing import Optional
class Solution:
    def splitListToParts(self, head: Optional[ListNode], k: int) -> List[Optional[ListNode]]:
        l = [[] for i in range(k)]
        for i, x in enumerate(head):
            ix = i % k
            l[ix].append(x)
        return l

Solution().splitListToParts([1,5], 3)


In [None]:
def selfDividingNumbers(left: int, right: int) -> List[int]:
    def is_self_dividing(value):
        sv = str(value)
        counter = 0
        for letter in sv:
            if letter != '0':
                counter += int(value % int(letter) != 0)
        return counter == 0

    return [i for i in range(left, right) if is_self_dividing(i)]

selfDividingNumbers(1,22)

In [None]:
def isIsomorphic(s: str, t: str) -> bool:

    letter_map = {}
    for i in range(len(s)):
        if s[i] in letter_map.keys():
            print(letter_map[s[i]])
            if letter_map[s[i]] != t[i]:
                return False
            
        if t[i] in letter_map.values():
            try:
                if letter_map[s[i]] != t[i]:
                    return False
            except KeyError:
                return False

        letter_map[s[i]] = t[i]

    new_word = ''
    for letter in s:
        new_word += letter_map[letter]

    return new_word == t
isIsomorphic('badc', 'baeg')

In [None]:
import importlib
fl =importlib.find_loader('numpy')
fl.get_data()

In [None]:
importlib.util.MAGIC_NUMBER

In [None]:
To completely override sys.path create a ._pth file
In the ._pth file specify one line for each path to add to sys.path

import sys
sys.path

In [None]:
import os
os.getenv('PYTHONPATH')

In [None]:
import ipaddress

# returns an ipaddress object
addr = ipaddress.ip_address('192.168.0.1')

# can also make networks, and explode or compress the IP address

In [None]:
list(ipaddress.ip_network('192.0.2.0/29').hosts())  

In [None]:
import faulthandler

def crash_function():
    x = 1 / 0  # division by zero will cause a ZeroDivisionError

faulthandler.enable()  # enable the fault handler

crash_function()


In [None]:
import tracemalloc

tracemalloc.start()

# ... run your application ...

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

print("[ Top 10 ]")
for stat in top_stats[:10]:
    print(stat)

In [None]:
import cProfile
import re
cProfile.run('re.compile("foo|bar")')


In [None]:
import sys

def audit_handler(event, args):
  print(f'Audit event triggered: {event}')
  print(f'Event arguments: {args}')

# Set the audit hook function
sys.addaudithook(audit_handler)

# Trigger an audit event
sys.audit('example_event', {'arg1': 'value1', 'arg2': 'value2'})

In [None]:
import cProfile
import pstats

def my_function():
    1*2
    print('hi')

# Run the code being profiled using cProfile, storing result in obj 'profiling_results'
cProfile.run("my_function()", "profiling_results")

# Load the profiling results into a pstats.Stats object
stats = pstats.Stats("profiling_results")

# Use the pstats.Stats object to analyze the profiling results
stats.strip_dirs()
stats.sort_stats("time")
stats.print_stats(20)


In [None]:
import trace

def my_function(x, y):
    return x + y

# Start tracing
trace.trace(trace=1, count=1)

# Call the function
result = my_function(1, 2)

# Stop tracing
trace.trace(trace=0)

# Print the trace output
print(trace.results())


In [2]:
import sys
dir(sys.stdout)

['__abstractmethods__',
 '__class__',
 '__del__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__iter__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__next__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '_abc_impl',
 '_buffer',
 '_checkClosed',
 '_checkReadable',
 '_checkSeekable',
 '_checkWritable',
 '_exc',
 '_fid',
 '_flush',
 '_flush_buffer',
 '_flush_pending',
 '_io_loop',
 '_is_master_process',
 '_isatty',
 '_master_pid',
 '_new_buffer',
 '_original_stdstream_copy',
 '_schedule_flush',
 '_setup_stream_redirects',
 '_should_watch',
 '_subprocess_flush_pending',
 '_watch_pipe_fd',
 'close',
 'closed',
 'detach',
 'echo',
 'encoding',
 'errors',
 'fileno',
 'flush',
 'flush_interval',
 'flush_timeout',
 'isatty',
 'name',
 'newlines',
 'parent_he

In [13]:
sys.get_coroutine_origin_tracking_depth()

0

In [14]:
sys.api_version

1013

In [19]:
import asyncio
import contextlib

async def openasync(file, mode):
    return open(file, mode)


@contextlib.asynccontextmanager
async def open_file(file, mode):
    f = await openasync(file, mode)
    try:
        yield f
    finally:
        await f.close()


async def main():
    async with open_file('test.txt', 'w') as f:
        await f.write('Hello, World!')

await main()


TypeError: object NoneType can't be used in 'await' expression

In [21]:
asyncio.get_event_loop_policy()

<asyncio.unix_events._UnixDefaultEventLoopPolicy at 0x7fda523251c0>

In [38]:
import asyncio
import aiohttp
from time import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        # Create a list of tasks to run concurrently
        tasks = [fetch(session, f'http://google.com') for i in range(100)]
        # Use asyncio.gather to run the tasks concurrently
        results = await asyncio.gather(*tasks)
        return results

t1 = time()
res = await main()
time() - t1

0.6087310314178467

In [49]:
import numpy as np
p = np.random.rand(5,5)
p

array([[0.60950889, 0.19458971, 0.56310982, 0.67851725, 0.44799328],
       [0.74685503, 0.49453248, 0.85610427, 0.98553755, 0.78706662],
       [0.82685706, 0.69527014, 0.69865362, 0.73073036, 0.72655735],
       [0.00173131, 0.61951283, 0.47658499, 0.52267093, 0.35264791],
       [0.98131821, 0.91268901, 0.75464793, 0.29506772, 0.05409064]])

In [58]:
p[0:3,0:3]

array([[0.60950889, 0.19458971, 0.56310982],
       [0.74685503, 0.49453248, 0.85610427],
       [0.82685706, 0.69527014, 0.69865362]])

In [84]:
import itertools as it

target = 3
combos = []
candidates = [1,2,3,4]
for i in range(1,len(candidates)+1):
    for k in it.combinations_with_replacement([1,2,3,4],i):
        if sum(k) == target:
            combos.append(k)
outset = set()
for c in combos:
    outset.add(c)
[list(o) for o in outset]

[[1, 2], [3], [1, 1, 1]]

In [90]:
sorted([1,24,3])

[1, 3, 24]

In [87]:
uniq_cands = list(set(candidates))
nc = [[c, c, c, c, c, c, c, c] for c in uniq_cands]
candidates = []
for c in nc:
    candidates.extend(c) 
candidates

[1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 2,
 2,
 2,
 2,
 2,
 2,
 2,
 2,
 3,
 3,
 3,
 3,
 3,
 3,
 3,
 3,
 4,
 4,
 4,
 4,
 4,
 4,
 4,
 4]

In [99]:
ls = []
for k in it.permutations([1,1,2]):
    if k not in ls:
        ls.append(k)
ls

[(1, 1, 2), (1, 2, 1), (2, 1, 1)]

In [96]:
for k in it.product()([1,2,4,5]):
    print(k)

TypeError: 'itertools.product' object is not callable

In [103]:
L=[1,2,3,4]
[L[i:i+j] for i in range(0,len(L)) for j in range(1,len(L)-i+1)]

[[1],
 [1, 2],
 [1, 2, 3],
 [1, 2, 3, 4],
 [2],
 [2, 3],
 [2, 3, 4],
 [3],
 [3, 4],
 [4]]

In [120]:
intervals = [[1,3],[2,6],[8,10],[15,18]]
g1 = intervals[0]
cont = False

gs = []
for i in range(len(intervals) - 1):
    if cont:
        cont = False
        continue
    if g1[1] >= intervals[i + 1][0]:
        g1 = [g1[0], intervals[i + 1][1]]
        cont = True
    else:
        g1 = intervals[i]
        
    
    gs.append(g1)


if i == len(intervals) - 2:
    print('a')
    if g1[1] < intervals[-1][0]:
        gs.append(intervals[-1])

gs

2
a


[[1, 6], [8, 10], [15, 18]]

In [122]:
sorted([k[0] for k in intervals], key=lambda k: intervals[k])

IndexError: list index out of range

In [123]:
[k[0] for k in intervals]

[1, 2, 8, 15]

In [128]:

import numpy as np
s = np.asarray([i[0] for i in intervals])
sort_index = np.argsort(s)
print(sort_index)

[0 1 2 3]


In [131]:
n=3
np.zeros((n,n))

array([[0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

In [133]:
next(type( range(1, n*n + 1))

TypeError: 'type' object is not an iterator

In [142]:
np.random.rand(1)

array([0.66792041])

In [145]:
try:
    1 / 0
except ArithmeticError:
    print('a')

a


In [152]:
import time
time.monotonic()

24749.430128667

In [156]:
# AssertionError


In [177]:
# pip3 install dnspython
import dns.resolver
  
# Finding records
result = [dns.resolver.query('geeksforgeeks.org', 'A'),
          dns.resolver.query('geeksforgeeks.org', 'A')]

# Printing record
for val in result:
    print('A Record : ', val[0].to_text())

A Record :  34.218.62.116
A Record :  34.218.62.116


  result = [dns.resolver.query('geeksforgeeks.org', 'A'),
  dns.resolver.query('geeksforgeeks.org', 'A')]


In [176]:
result[0].to_text()

'34.218.62.116'

In [164]:
import socket
import ssl

hostname = 'www.python.org'
# PROTOCOL_TLS_CLIENT requires valid cert chain and hostname
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
print(f'context certs: {context.load_default_certs()}')
#context.load_verify_locations('path/to/cabundle.pem')

with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock:
    with context.wrap_socket(sock, server_hostname=hostname) as ssock:
        print(ssock)
        print(ssock.version())


context certs: None
<ssl.SSLSocket fd=136, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 0)>
None


In [204]:
import asyncio
from time import sleep 

# define a coroutine
async def custom_coro():
    print('a coro is occuring')
    return 3

# create a task from said coroutine
task = asyncio.create_task(custom_coro())

# allow task to run: it will run without this, but not immediately
# await task
print(task)

# check if a task is done
if task.done():
    print('done')

# check if a task was canceled
if task.cancelled():
    print('cancelled')
    
if not task.done():
    await task
res = task.result()

try:
    # get the return value from the wrapped coroutine
    value = task.result()
    print(f'value: {value}')
except asyncio.CancelledError as e:
    print(e)

<Task pending name='Task-5607' coro=<custom_coro() running at /var/folders/x2/bt81rqpj7pl_j7fczgml3pd80000gn/T/ipykernel_41239/2514762197.py:5>>
a coro is occuring
value: 3


In [207]:
import asyncio

class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self.transport.write(data)

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()




RuntimeError: This event loop is already running