In [None]:
!pip install clickhouse-driver[lz4] --user

In [None]:
!pip install clickhouse2pandas --user

In [10]:
!pgrep clickhouse-server

In [53]:
from clickhouse_driver import Client

client = Client("dev-usr-0017")
client.execute('SHOW TABLES')

[]

In [54]:
result = client.execute("SELECT toYYYYMM(now()), version()")
result

[(202005, '20.4.2.9')]

In [55]:
import pickle
from datetime import datetime

with open("/home/makilins/Downloads/movies.p","rb") as tmdb_file:
    data = pickle.load(tmdb_file)
    movs = data
    json_docs = []
    for mov in movs:
        mov_short = {}
        mov_short["Id"] = mov["id"]
        mov_short["Adult"] = mov["adult"]
        mov_short["OriginalLanguage"] = mov["original_language"]
        mov_short["Title"] = mov["title"]
        #mov_short["overview"] = mov["overview"]
        mov_short["Tagline"] = mov["tagline"]
        mov_short["ReleaseDate"] = datetime.strptime(mov["release_date"], '%Y-%m-%d')
        mov_short["Budget"] = mov["budget"]
        mov_short["Revenue"] = mov["revenue"]
        mov_short["VoteCount"] = mov["vote_count"]
        mov_short["VoteAverage"] = mov["vote_average"]
        mov_short["Runtime"] = mov["runtime"]
        mov_short["Genres"] = [genre["name"] for genre in mov["genres"]]
        mov_short["ProductionCompanies"] = [prod_co["name"] for prod_co in mov["production_companies"]] 
        json_docs.append(mov_short)
        
json_docs[0]

{'Adult': False,
 'Budget': 220000000,
 'Genres': ['Action', 'Adventure', 'Science Fiction'],
 'Id': 24428,
 'OriginalLanguage': 'en',
 'ProductionCompanies': ['Paramount Pictures', 'Marvel Studios'],
 'ReleaseDate': datetime.datetime(2012, 5, 4, 0, 0),
 'Revenue': 1518594910,
 'Runtime': 143,
 'Tagline': 'Some assembly required.',
 'Title': 'The Avengers',
 'VoteAverage': 7.2,
 'VoteCount': 6535}

In [18]:
len(json_docs)

400

In [56]:
client.execute("CREATE DATABASE IF NOT EXISTS test")

[]

In [57]:
client.execute("DROP TABLE IF EXISTS test.movies")

[]

In [58]:
create_table_query = """
CREATE TABLE test.movies
(
Adult UInt8,
Budget UInt64,
Genres Array(String),
Id UInt32,
OriginalLanguage String,
ProductionCompanies Array(String),
ReleaseDate Date,
Revenue UInt64,
Runtime Int16,
Tagline String,
Title String,
VoteAverage Float32, 
VoteCount UInt16
)
ENGINE = MergeTree()
PARTITION BY toYear(ReleaseDate)
ORDER BY (Id, ReleaseDate)
"""
client.execute(create_table_query)

[]

In [59]:
client.execute('SHOW TABLES IN test')

[('movies',)]

In [60]:
insert_query = """
INSERT INTO test.movies(
Adult,
Budget,
Genres,
Id,
OriginalLanguage,
ProductionCompanies,
ReleaseDate,
Revenue,
Runtime,
Tagline,
Title,
VoteAverage, 
VoteCount) VALUES
"""

client.execute(insert_query, json_docs)

400

In [72]:
client.execute("SHOW GRANTS")

[('GRANT ALTER, CREATE, DROP, INSERT, INTROSPECTION, KILL QUERY, OPTIMIZE, SELECT, SHOW, SOURCES, SYSTEM, TRUNCATE, dictGet ON *.* TO default',)]

In [74]:
import clickhouse2pandas as ch2pd

connection_url = 'http://default:@dev-usr-0017:8123'

query = 'SELECT * FROM test.movies LIMIT 3'

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,Budget,Genres,Id,OriginalLanguage,ProductionCompanies,ReleaseDate,Revenue,Runtime,Tagline,Title,VoteAverage,VoteCount
0,0,10500000,"[Adventure, Mystery, Science Fiction]",62,en,"[Metro-Goldwyn-Mayer (MGM), Stanley Kubrick Pr...",0000-00-00,56715371,149,An epic drama of adventure and exploration,2001: A Space Odyssey,7.4,764
1,0,1200000,[Western],429,it,"[Constantin Film, United Artists, 20th Century...",0000-00-00,6000000,161,For three men the Civil War wasn't hell. It wa...,"The Good, the Bad and the Ugly",7.6,848
2,0,2200000,[Drama],185,en,"[Hawk Films, Warner Bros.]",1971-12-18,26589000,136,Being the adventures of a young man whose prin...,A Clockwork Orange,7.3,770


In [84]:
client.execute("DROP VIEW test.movies_agg")

[]

In [85]:
aggregation_query = """
CREATE MATERIALIZED VIEW test.movies_agg
ENGINE = AggregatingMergeTree() PARTITION BY ReleaseYear ORDER BY (ReleaseYear, OriginalLanguage)
POPULATE
AS SELECT 
    toYear(ReleaseDate) as ReleaseYear,
    OriginalLanguage,
    sumState(VoteCount) as VoteCount,
    uniqState(Id) as DistinctCount,
    avgState(VoteAverage) as VoteAverage
FROM test.movies
GROUP BY toYear(ReleaseDate), OriginalLanguage
"""

client.execute(aggregation_query)

[]

In [88]:
query = """
SELECT ReleaseYear,
    sumMerge(VoteCount) as VoteCount,
    uniqMerge(DistinctCount) as DistinctCount,
    avgMerge(VoteAverage) as VoteAverage
    
FROM test.movies_agg
GROUP BY 
    ReleaseYear
ORDER BY 
    ReleaseYear
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,ReleaseYear,VoteCount,DistinctCount,VoteAverage
0,1970,1612,2,7.5
1,1971,770,1,7.3
2,1972,2280,1,8.1
3,1974,1167,1,8.0
4,1975,730,1,7.7


In [90]:
query = """
SELECT ReleaseYear,
    sumMerge(VoteCount) as VoteCount,
    uniqMerge(DistinctCount) as DistinctCount,
    avgMerge(VoteAverage) as VoteAverage
    
FROM test.movies_agg
WHERE 
    OriginalLanguage = 'en'
GROUP BY 
    ReleaseYear
ORDER BY 
    ReleaseYear
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,ReleaseYear,VoteCount,DistinctCount,VoteAverage
0,1970,764,1,7.4
1,1971,770,1,7.3
2,1972,2280,1,8.1
3,1974,1167,1,8.0
4,1975,730,1,7.7


In [92]:
# SummingMergeTree - выполняет суммирование в фоновом режиме в реальном времени при вставке

sum_create_query = """
CREATE TABLE test.movies_sum
(
    Adult UInt8,
    OriginalLanguage String,
    Budget UInt64,
    Revenue UInt64,
    VoteCount UInt16       
)
ENGINE = SummingMergeTree()
ORDER BY (Adult, OriginalLanguage)
"""

client.execute(sum_create_query)

[]

In [96]:
import pickle
from datetime import datetime

with open("/home/makilins/Downloads/movies.p","rb") as tmdb_file:
    data = pickle.load(tmdb_file)
    movs = data
    json_docs = []
    for mov in movs:
        mov_short = {}
        mov_short["Adult"] = mov["adult"]
        mov_short["OriginalLanguage"] = mov["original_language"]
        mov_short["Budget"] = mov["budget"]
        mov_short["Revenue"] = mov["revenue"]
        mov_short["VoteCount"] = mov["vote_count"]
        json_docs.append(mov_short)
        
json_docs[0]

{'Adult': False,
 'Budget': 220000000,
 'OriginalLanguage': 'en',
 'Revenue': 1518594910,
 'VoteCount': 6535}

In [100]:
sum_insert = """
INSERT INTO test.movies_sum VALUES
"""

client.execute(sum_insert, json_docs)

400

In [104]:
client.execute("OPTIMIZE TABLE test.movies_sum") # Принудительно схлопываем данные

[]

In [107]:
# FINAL - принудительная агрегация

query = """
SELECT *
FROM test.movies_sum FINAL
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,OriginalLanguage,Budget,Revenue,VoteCount
0,0,en,36609800000,149018677002,13188
1,0,es,19000000,83258226,724
2,0,fr,235400000,1769512841,5113
3,0,it,1200000,235400000,1644
4,0,ja,15000000,274925095,1072


In [102]:
query = """
SELECT 
    Adult,
    OriginalLanguage,
    sum(Budget) as Budget,
    sum(Revenue) as Revenue,
    sum(VoteCount) as VoteCount
FROM test.movies_sum
GROUP BY 
    Adult,
    OriginalLanguage
ORDER BY
    Adult,
    OriginalLanguage
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,OriginalLanguage,Budget,Revenue,VoteCount
0,0,en,36609800000,149018677002,537476
1,0,es,19000000,83258226,724
2,0,fr,235400000,1769512841,5113
3,0,it,1200000,235400000,1644
4,0,ja,15000000,274925095,1072


In [112]:
create_collapsing_query = """
CREATE TABLE test.movies_collapsing
(
Adult UInt8,
Budget UInt64,
Id UInt32,
OriginalLanguage String,
Revenue UInt64,
VoteCount UInt16,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY Id
"""
client.execute(create_collapsing_query)

[]

In [113]:
import pickle
from datetime import datetime

with open("/home/makilins/Downloads/movies.p","rb") as tmdb_file:
    data = pickle.load(tmdb_file)
    movs = data[0:10]
    json_docs = []
    for mov in movs:
        mov_short = {}
        mov_short["Id"] = mov["id"]
        mov_short["Adult"] = mov["adult"]
        mov_short["OriginalLanguage"] = mov["original_language"]
        mov_short["Budget"] = mov["budget"]
        mov_short["Revenue"] = mov["revenue"]
        mov_short["VoteCount"] = mov["vote_count"]
        mov_short["Sign"] = 1
        json_docs.append(mov_short)
        
json_docs[0]

{'Adult': False,
 'Budget': 220000000,
 'Id': 24428,
 'OriginalLanguage': 'en',
 'Revenue': 1518594910,
 'Sign': 1,
 'VoteCount': 6535}

In [114]:
sum_insert = """
INSERT INTO test.movies_collapsing VALUES
"""

client.execute(sum_insert, json_docs)

10

In [115]:
query = """
SELECT *
FROM test.movies_collapsing
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,Budget,Id,OriginalLanguage,Revenue,VoteCount,Sign
0,0,185000000,155,en,1001921825,5306,1
1,0,237000000,19995,en,2781505847,6075,1
2,0,220000000,24428,en,1518594910,6535,1
3,0,160000000,27205,en,825500000,6353,1
4,0,200000000,37724,en,1108694081,4334,1


In [116]:
import pickle
from datetime import datetime

with open("/home/makilins/Downloads/movies.p","rb") as tmdb_file:
    data = pickle.load(tmdb_file)
    movs = data[0:10]
    json_docs = []
    for mov in movs:
        mov_short = {}
        mov_short["Id"] = mov["id"]
        mov_short["Adult"] = mov["adult"]
        mov_short["OriginalLanguage"] = mov["original_language"]
        mov_short["Budget"] = mov["budget"]
        mov_short["Revenue"] = mov["revenue"]
        mov_short["VoteCount"] = mov["vote_count"]
        mov_short["Sign"] = -1
        json_docs.append(mov_short)
        
sum_insert = """
INSERT INTO test.movies_collapsing VALUES
"""

client.execute(sum_insert, json_docs)

10

In [118]:
import pickle
from datetime import datetime

with open("/home/makilins/Downloads/movies.p","rb") as tmdb_file:
    data = pickle.load(tmdb_file)
    movs = data[0:10]
    json_docs = []
    for mov in movs:
        mov_short = {}
        mov_short["Id"] = mov["id"]
        mov_short["Adult"] = mov["adult"]
        mov_short["OriginalLanguage"] = mov["original_language"]
        mov_short["Budget"] = mov["budget"] + 1500
        mov_short["Revenue"] = mov["revenue"] + 1500
        mov_short["VoteCount"] = mov["vote_count"] + 15
        mov_short["Sign"] = 1
        json_docs.append(mov_short)
        
sum_insert = """
INSERT INTO test.movies_collapsing VALUES
"""

client.execute(sum_insert, json_docs)

10

In [120]:
query = """
SELECT *
FROM test.movies_collapsing
WHERE Id = 155
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,Budget,Id,OriginalLanguage,Revenue,VoteCount,Sign
0,0,185001500,155,en,1001923325,5321,1
1,0,185000000,155,en,1001921825,5306,-1
2,0,185000000,155,en,1001921825,5306,1


In [121]:
query = """
SELECT 
    Adult,
    Id,
    sum(Sign*Revenue) as Revenue,
    sum(Sign*Budget) as Budget,
    sum(Sign*VoteCount) as VoteCount
    
FROM test.movies_collapsing
WHERE Id = 155
GROUP BY Adult, Id 
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,Id,Revenue,Budget,VoteCount
0,0,155,1001923325,185001500,5321


In [122]:
client.execute("OPTIMIZE TABLE test.movies_collapsing FINAL") # Принудительно схлопываем данные

[]

In [123]:
query = """
SELECT *
FROM test.movies_collapsing
WHERE Id = 155
LIMIT 5
"""

df = ch2pd.select(connection_url, query)
df




Unnamed: 0,Adult,Budget,Id,OriginalLanguage,Revenue,VoteCount,Sign
0,0,185001500,155,en,1001923325,5321,1
