In [10]:
import torch
from PIL import Image
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
import json
import pandas as pd
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import lzma
import random

In [3]:
img_tensors = []
images = []
directory = 'bzzdwn'
size = 1440, 1440

path = Path(directory)
for file in sorted(path.iterdir()):
    if file.is_file() and file.suffix == '.jpg':
        image = Image.open(file)
        #image.thumbnail(size, Image.Resampling.LANCZOS)
        images.append(image)

transform = transforms.Compose([
    transforms.PILToTensor()
])

for image in images:
    img_tensors.append(TF.to_tensor(image))

In [4]:
def read_jsonl_xz(file_path):
  with lzma.open(file_path, 'rt', encoding='utf-8') as file:
    for line in file:
      yield json.loads(line)

In [5]:
likes = []

for file in sorted(path.iterdir()):
    if file.is_file() and file.suffix == '.xz':
        data = list(read_jsonl_xz(file))
        data = pd.DataFrame(data[0])
        #data = pd.read_json(file)
        likes.append(data.loc['edge_media_preview_like'].node['count'])

In [6]:
max_height = max([img.size(1) for img in img_tensors])
max_width = max([img.size(2) for img in img_tensors])

img_tensors = [
    F.pad(img, [0, max_width - img.size(2), 0, max_height - img.size(1)])
    for img in img_tensors
]

In [7]:
img_tensors = torch.stack(img_tensors)

In [8]:
img_tensors.shape

torch.Size([12, 3, 1484, 1440])

In [17]:
temp = list(zip(img_tensors, likes))
random.shuffle(temp)
X, y = zip(*temp)
X, y = list(X), list(y)
X_train, y_train = X[:11], y[:11]
X_test, y_test = X[11], y[11]

In [18]:
class CNN(nn.Module):
    """
    This will be the very basic CNN model we will use for the regression task.
    """
    def __init__(self, image_size):
        super(CNN, self).__init__()
        self.image_size = image_size
        self.conv1 = nn.Conv2d(in_channels=self.image_size[0], out_channels=4, kernel_size=3, stride=1, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=4, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.linear_line_size = int(16*(image_size[1]//4)*(image_size[2]//4))
        self.fc1 = nn.Linear(in_features=self.linear_line_size, out_features=128)
        self.fc2 = nn.Linear(in_features=128, out_features=1)


    def forward(self, x):
        """
        Passes the data through the network.
        There are commented out print statements that can be used to
        check the size of the tensor at each layer. These are very useful when
        the image size changes and you want to check that the network layers are
        still the correct shape.
        """
        x = self.conv1(x)
        #print('Size of tensor after each layer')
        #print(f'conv1 {x.size()}')
        x = nn.functional.relu(x)
        #print(f'relu1 {x.size()}')
        x = self.pool1(x)
        #print(f'pool1 {x.size()}')
        x = self.conv2(x)
        #print(f'conv2 {x.size()}')
        x = nn.functional.relu(x)
        #print(f'relu2 {x.size()}')
        x = self.pool2(x)
        #print(f'pool2 {x.size()}')
        x = x.view(-1, self.linear_line_size)
        #print(f'view1 {x.size()}')
        x = self.fc1(x)
        #print(f'fc1 {x.size()}')
        x = nn.functional.relu(x)
        #print(f'relu2 {x.size()}')
        x = self.fc2(x)
        #print(f'fc2 {x.size()}')
        return x

In [19]:
y_train = torch.FloatTensor(y_train)

In [21]:
dataset = [(X_train[i], y_train[i]) for i in range(0, len(y_train))]

In [22]:
model = CNN([3, 1484, 1440])
print(model)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

n_epochs = 10

# Train the model
for epoch in range(n_epochs):
    for i, (inputs, targets) in enumerate(dataset):
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()


        # Print training statistics
        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch + 1}/{n_epochs}], Step [{i + 1}/{len(dataset)}], Loss: {loss.item():.4f}')

CNN(
  (conv1): Conv2d(3, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(4, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=2136960, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=1, bias=True)
)


  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/10], Step [10/11], Loss: 261.1115
Epoch [2/10], Step [10/11], Loss: 751.5861
Epoch [3/10], Step [10/11], Loss: 2.2399
Epoch [4/10], Step [10/11], Loss: 644.8683
Epoch [5/10], Step [10/11], Loss: 17.1728
Epoch [6/10], Step [10/11], Loss: 0.9448
Epoch [7/10], Step [10/11], Loss: 32.1507
Epoch [8/10], Step [10/11], Loss: 13.9883
Epoch [9/10], Step [10/11], Loss: 0.0033
Epoch [10/10], Step [10/11], Loss: 99.4594


In [24]:
model(X_test)

tensor([[61.8760]], grad_fn=<AddmmBackward0>)

In [25]:
y_test

53