In [49]:
import random
import import_ipynb

from typing import List, Dict
from collections import Counter
import math

import matplotlib.pyplot as plt

In [50]:
def bucketize(point: float, bucket_size: float) -> float:
    """Coloque o ponto perto do próximo mínimo multiplo de bucket_size"""
    return bucket_size * math.floor(point / bucket_size)

def make_histogram(points: List[float], bucket_size: float) -> Dict[float, int]:
    """Coloca os pontos em buckets e conta o número de pontos em cada bucket"""
    return Counter(bucketize(point, bucket_size) for point in points)

def plot_histogram(points: List[float], bucket_size: float, title: str = ""):
    histogram = make_histogram(points, bucket_size)
    plt.bar(histogram.keys(), histogram.values(), width=bucket_size)
    plt.title(title)
    plt.show()

    

In [51]:
# exemplo de um conjunto de dados

from probability import inverse_normal_cdf

In [52]:
random.seed(0)

# uniforme entre -100 e 100
uniform = [200 * random.random() - 100 for _ in range(10000)]

# distribuição normal com média 0 e desvio 57
normal = [57 * inverse_normal_cdf(random.random()) for _ in range(10000)]

# plot_histogram(uniform, 10, "Uniform Histogram")

In [53]:
# plot_histogram(normal, 10, "Normal Histogram")

In [54]:
# dados com mais de uma dimensão

def random_normal() -> float:
    """Retorna um ponto aleatório de uma distribuição normal padrão"""
    return inverse_normal_cdf(random.random())

# xs = [random_normal() for _ in range(1000)]
# ys1 = [x + random_normal() / 2 for x in xs]
# ys2 = [-x + random_normal() / 2 for x in xs]

# plt.scatter(xs, ys1, marker='.', color='black', label='ys1')
# plt.scatter(xs, ys2, marker='.', color='gray', label='ys2')
# plt.xlabel('xs')
# plt.ylabel('ys')
# plt.legend(loc=9)
# plt.title('Very Different Joint Distributions')
# plt.show()

In [55]:
from statistics import correlation

# print(correlation(xs, ys1))
# print(correlation(xs, ys2))

In [56]:
# muitas dimensões
# matriz de correlação

from linear_algebra import Matrix, Vector, make_matrix

def correlation_matrix(data: List[Vector]) -> Matrix:
    """Retorna a matrix len(data) x len(data), na qual a entrada (i, j) é a correlação entre data[i] e data[j]"""
    def correlation_ij(i: int, j: int) -> float:
        return correlation(data[i], data[j])
    
    return make_matrix(len(data), len(data), correlation_ij)

In [57]:
# corr_data é uma lista com quatro vetores 100-d

# corr_data = [list(col) for col in zip(*corr_rows)]

# num_vectors = len(corr_data)
# fig, ax = plt.subplots(num_vectors, num_vectors)

# for i in range(num_vectors):
#     for j in range(num_vectors):

#         # disperse a column_j no eixo x ea column_i no eixo y
#         if i != j: ax[i][j].scatter(corr_data[j], corr_data[i])

#         # a menos que i==j, neste caso, mostre o nome da série
#         else: ax[i][j].annotate("serie" + str(i), (0.5, 0.5), xcoords='axes fraction', ha='center', va='center')

#         # Em seguida, oculte os rótulos dos eixos, exceto pelos gráficos à esquerda e na parte inferior
#         if i < num_vectors - 1: ax[i][j].xaxis.set_visible(False)
#         if j > 0: ax[i][j].yaxis.set_visible(False)

# ax[-1][-1].set_xlim(ax[0][-1].get_xlim())        
# ax[0][0].set_ylim(ax[0][1].get_ylim())

# plt.show()


### Usando o Named Tuple

In [58]:
import datetime

stock_price = {
    'closing_price': 102.06,
    'date': datetime.date(2014, 8, 29),
    'symbol': 'AAPL'
}

# os dicts tem grande propensão a erros de digitação
stock_price['cosing_price'] = 103.06

# aqui também perdemos o poder das dicas de tipo
prices: Dict[datetime.time, float] = {}

In [59]:
from collections import namedtuple

StockPrice = namedtuple('StockPrice', ['symbol', 'date', 'closing_price'])
price = StockPrice('MSFT', datetime.date(2018, 12, 14), 106.03)

assert price.symbol == 'MSFT'
assert price.closing_price == 106.03

In [60]:
from typing import NamedTuple

class StockPrice(NamedTuple):
    symbol: str
    date: datetime.date
    closing_price: float

    def is_high_tech(self) -> bool:
        """Como é uma classe, também podemos adicoinar métodos"""
        return self.symbol in ['MFST', 'GOOG', 'FB', 'AMZN', 'AAPL']
    
price = StockPrice('MFST', datetime.date(2018, 12, 14), 106.03)

assert price.symbol == 'MFST'

In [61]:
from dataclasses import dataclass

@dataclass
class StockPrice2:
    symbol: str
    date: datetime.date
    closing_price: float

    def is_high_tech(self) -> bool:
        """Como é uma classe, também podemos adicoinar métodos"""
        return self.symbol in ['MFST', 'GOOG', 'FB', 'AMZN', 'AAPL']
    
price2 = StockPrice2('MFST', datetime.date(2018, 12, 14), 106.03)    

assert price2.symbol == 'MFST'
assert price2.closing_price == 106.03

In [62]:
price2.closing_price /= 2
# assert price2.closing_price == 51.01

### Limpando e Estruturando

In [63]:
from dateutil.parser import parse

def parse_row(row: List[str]) -> StockPrice:
    symbol, date, closing_price = row
    return StockPrice(symbol=symbol,
                      date = parse(date).date(),
                      closing_price=float(closing_price))

# testando a função
stock = parse_row(['MFST', "2018-12-14", "106.03"])

assert stock.symbol == 'MFST'
assert stock.closing_price == 106.03

In [64]:
from typing import Optional
import re

def try_parse_now(row: List[str]) -> Optional[StockPrice]:
    symbol, date_, closing_price_ = row

    # Os simbolos das ações devem ser em letra maiúscula
    if not re.match(r"^[A-Z]+$", symbol):
        return None
    
    try:
        date = parse(date_).date()
    except ValueError:
        return None

    try:
        closing_price = float(closing_price_)
    except ValueError:
        return None

    return StockPrice(symbol, date, closing_price)


assert try_parse_now(['MFST0', '2018-12-14', '106.03']) is None
assert try_parse_now(['MFST', '2018-12--14', '106.03']) is None
assert try_parse_now(['MFST', '2018-12-14', 'x']) is None

assert try_parse_now(['MFST', '2018-12-14', '106.03']) == stock

In [65]:
import csv

data: List[StockPrice] = []

# with open('./datasets/comma_delimited_stock_prices.csv') as f:
#     reader = csv.reader(f)
#     for row in reader:
#         maybe_stock = try_parse_now(row)
#         if maybe_stock is None:
#             print(f'skipping invalid row: {row}')
#         else:
#             data.append(maybe_stock)

# data

with open('./datasets/stocks.csv', "r") as f:
    reader = csv.DictReader(f)
    rows = [[row['Symbol'], row['Date'], row['Close']] for row in reader]

maybe_data = [try_parse_now(row) for row in rows]

assert maybe_data
assert all(sp is not None for sp in maybe_data)

data = [sp for sp in maybe_data if sp is not None]

### Manipulando Dados

In [66]:
# qual o maior preço de fechamento de appl

max_appl_price = max(stock_price.closing_price
                     for stock_price in data
                     if stock_price.symbol == 'AAPL')

max_appl_price

232.070007

In [67]:
# qual o maior preço de fechamanto de cada ação

from collections import defaultdict

max_prices: Dict[str, float] = defaultdict(lambda: float('-inf'))

for sp in data:
    symbol, closing_price = sp.symbol, sp.closing_price
    if closing_price > max_prices[symbol]:
        max_prices[symbol] = closing_price

max_prices

defaultdict(<function __main__.<lambda>()>,
            {'AAPL': 232.070007,
             'MSFT': 115.610001,
             'FB': 217.5,
             'GOOG': 1268.329956})

In [68]:
# 

from typing import List
from collections import defaultdict

# Colete os preços por símbolo
prices: Dict[str, List[StockPrice]] = defaultdict(list)

for sp in data:
    prices[sp.symbol].append(sp)

prices = {symbol: sorted(symbol_prices)
          for symbol, symbol_prices in prices.items()}


In [69]:
def pct_change(yesterday: StockPrice, today: StockPrice) -> float:
    return today.closing_price / yesterday.closing_price - 1

class DailyChange(NamedTuple):
    symbol: str
    date: datetime.date
    pct_change: float

def day_over_day_changes(prices: List[StockPrice]) -> List[DailyChange]:
    """Presume que os preços são de uma ação e estão classificados"""
    return [DailyChange(symbol=today.symbol,
                        date=today.date,
                        pct_change=pct_change(yesterday, today))
                        for yesterday, today in zip(prices, prices[1:])]

all_changes = [change
               for symbol_prices in prices.values()
               for change in day_over_day_changes(symbol_prices)]

all_changes

[DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 15), pct_change=-0.05217445504710816),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 16), pct_change=-0.07339392980372261),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 17), pct_change=0.0247531010683244),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 18), pct_change=0.02898362529055043),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 19), pct_change=0.061033219335108635),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 22), pct_change=0.04867344349646352),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 23), pct_change=0.04219327130645856),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 24), pct_change=0.052631865331493),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 26), pct_change=0.09230869964521826),
 DailyChange(symbol='AAPL', date=datetime.date(1980, 12, 29), pct_change=0.014083596112498542),
 DailyChange(symbol='AAPL', date=datetime.date(19

In [70]:
# encontrando o maior e o menor valor

max_change = max(all_changes, key=lambda change: change.pct_change)

assert max_change.symbol == 'AAPL'
assert max_change.date == datetime.date(1997, 8, 6)
assert 0.33 < max_change.pct_change < 0.34

min_change = min(all_changes, key=lambda change: change.pct_change)

assert min_change.symbol == 'AAPL'
assert min_change.date == datetime.date(2000, 9, 29)
assert -0.52 < min_change.pct_change < -0.51

In [71]:
changes_by_month: List[DailyChange] = {month: [] for month in range(1, 13)}

for change in all_changes:
    changes_by_month[change.date.month].append(change)

avg_daily_change = {
    month: sum(change.pct_change for change in changes) / len(changes)
    for month, changes in changes_by_month.items()
}
avg_daily_change

{1: 0.0021865911307884013,
 2: 0.00014320928359588448,
 3: 0.0009950657322022312,
 4: 0.0017246603289960414,
 5: 0.001045801424244935,
 6: -0.00038890680439638084,
 7: 0.0008653144845121465,
 8: 0.0012329314091353012,
 9: -4.63670330797049e-06,
 10: 0.002905423893850083,
 11: 0.0008081709427041247,
 12: 0.0009192394682663616}

### Redimensionamento

In [72]:
from linear_algebra import distance

# dados são sensíveis a escala
# quando clusterizamos, queremos encontrar os pontos mais próximos
# em dados absolutos, o vizinho mais próximo de B é A

a_to_b = distance([63, 150], [67, 160])
a_to_b

a_to_c = distance([63, 150], [70, 171])
a_to_c

b_to_c = distance([67, 160], [70, 171])
b_to_c

11.40175425099138

In [73]:
# considerando os centimetros o vizinho mais próximo de B é C

a_to_b = distance([160, 150], [170.2, 160])
a_to_b

a_to_c = distance([160, 150], [177.8, 171])
a_to_c

b_to_c = distance([170.2, 160], [177.8, 171])
b_to_c

13.370115930686627

In [74]:
from typing import Tuple

from linear_algebra import vector_mean
from statistics_ import standard_deviation

def scale(data: List[Vector]) -> Tuple[Vector, Vector]:
    """Retorna a média e o desvio padrão de cada posição"""
    dim = len(data[0])

    means = vector_mean(data)
    stdevs = [standard_deviation([vector[i] for vector in data]) for i in range(dim)]

    return means, stdevs

vectors = [[-3, -1, 1], [-1, 0, 1], [1, 1, 1]]
means, stdevs = scale(vectors)

assert means == [-1, 0, 1]
assert stdevs == [2, 1, 0]

In [75]:
def rescale(data: List[Vector]) -> List[Vector]:
    """
    Redimensiona os dados de entrada para que cada posição tenha média 0 e desvio-padrão 1.
    (Deixa a posição como está se o desvio-padrão for 0)
    """
    dim = len(data[0])
    means, stdevs = scale(data)

    # Faça uma cópia de cada vetor
    rescaled = [v[:] for v in data]

    for v in rescaled:
        for i in range(dim):
            if stdevs[i] > 0:
                v[i] = (v[i] - means[i]) / stdevs[i]

    return rescaled

means, stdevs = scale(rescale(vectors))
assert means == [0, 0, 1]
assert stdevs == [1, 1, 0]

### TQDM

In [76]:
!python -m pip install tqdm




[notice] A new release of pip available: 22.3.1 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [77]:
import tqdm

for i in tqdm.tqdm(range(100)):
    # faça algo devagar
    _ = [random.random() for _ in range(100000)]

100%|██████████| 100/100 [00:07<00:00, 13.47it/s]


In [78]:
from typing import List

def primes_up_to(n: int) -> List[int]:
    primes = [2]

    with tqdm.trange(3, n) as t:
        for i in t:
            # i é primo se não for divisivel por nenhum numero primo menor
            i_is_prime = not any(i % p == 0 for p in primes)
            if i_is_prime:
                primes.append(i)

            t.set_description(f"{len(primes)} primes")

    return primes

# my_primes = primes_up_to(100_000)

885 primes:   7%|▋         | 6868/99997 [00:33<07:32, 205.71it/s]


KeyboardInterrupt: 

In [79]:
from linear_algebra import subtract

# traduzindo os dados para que cada dimensão tenha média 0

def de_mean(data: List[Vector]) -> List[Vector]:
    """Centraliza novamente os dados para que todas as dimensões tenham média 0"""
    mean = vector_mean(data)
    return [subtract(vector, mean) for vector in data]

In [80]:
from linear_algebra import magnitude

# Qual é a direção que captura a maior variação nos dados?
def direction(w: Vector) -> Vector:
    mag = magnitude(w)
    return [w_i / mag for w_i in w]

In [81]:
from linear_algebra import dot

# Qual a variação no conjunto de dados determinada por w?
def diretional_variance(data: List[Vector], w: Vector) -> float:
    """Retorna a variação de x na direção w"""
    w_dir = direction(w)
    return sum(dot(v, w_dir) ** 2 for v in data)


In [82]:
def diretional_variance_gradient(data: List[Vector], w: Vector) -> Vector:
    """
    O gradiente da variação direcional em relação a w
    """
    w_dir = direction(w)
    return [sum(2 * dot(v, w_dir) * v[i] for v in data) for i in range(len(w))]


In [83]:
from gradient_descent import gradient_step

def first_principal_component(data: List[Vector],
                                n: int = 100,
                                step_size: float = 0.1) -> Vector:
    # Começe com um valor aleatório
    guess = [1.0 for _ in data[0]]

    with tqdm.trange(n) as t:
        for _ in t:
            dv = diretional_variance(data, guess)
            gradient = diretional_variance_gradient(data, guess)
            guess = gradient_step(guess, gradient, step_size)
            t.set_description(f"dv: {dv:.3f}")
    
    return direction(guess)


importing Jupyter notebook from gradient_descent.ipynb


In [84]:
import import_ipynb
from linear_algebra import scalar_multiply, dot, subtract

def project(v: Vector, w: Vector) -> Vector:
    """Retorne a projeção de v na direção w"""
    projection_length = dot(v, w)
    return scalar_multiply(projection_length, w)

In [85]:
def remove_projection_from_vector(v: Vector, w: Vector) -> Vector:
    """Projete v em w e subtraia o resultado de v"""
    return subtract(v, project(v, w))

def remove_projection(data: List[Vector], w: Vector) -> List[Vector]:
    return [remove_projection_from_vector(v, w) for v in data]

In [86]:
def pca(data: List[Vector], num_components: int) -> List[Vector]:
    components: List[Vector] = []
    for _ in range(num_components):
        component = first_principal_component(data)
        components.append(component)
        data = remove_projection(data, component)

    return components

def transform_vector(v: Vector, components: List[Vector]) -> Vector:
    return [dot(v, w) for w in components]

def transform(data: List[Vector], components: List[Vector]) -> List[Vector]:
    return [transform_vector(v, components) for v in data]
