Skip to content

Commit

Permalink
Support for building Windows wheels
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Sep 19, 2018
1 parent 9cefdd6 commit c71e075
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 50 deletions.
25 changes: 0 additions & 25 deletions .appveyor-disabled.yml

This file was deleted.

30 changes: 30 additions & 0 deletions .appveyor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
environment:
global:
LIBRDKAFKA_NUGET_VERSION: 0.11.6-RC2
CIBW_SKIP: cp33-* cp34-*
CIBW_TEST_REQUIRES: pytest requests avro
# SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the
# /E:ON and /V:ON options are not enabled in the batch script intepreter
# See: http://stackoverflow.com/a/13751649/163740
CMD_IN_ENV: "cmd /E:ON /V:ON /C .\\tools\\appveyor\\run_with_env.cmd"

build_script:
- tools/windows-build.bat

artifacts:
- path: "wheelhouse\\*.whl"
name: Wheels

deploy:
- provider: S3
access_key_id:
secure: RIuhB6QPQeCdchBMSmaY/7aSrrdih+HJu443UaKsH/I=
secret_access_key:
secure: YrPW943StN3C9o9enGGpfMns7wxD7+lArRgEavjeWlO2uy2jLPKkCnSQ60qe1ffB
region: us-west-1
bucket: librdkafka-ci-packages
folder: confluent-kafka-python/p-confluent-kafka-python__bld-appveyor__plat-windows__bldtype-release__tag-$(APPVEYOR_REPO_TAG_NAME)__sha-$(APPVEYOR_REPO_COMMIT)__bid-$(APPVEYOR_BUILD_ID)
artifact: /wheelhouse\/.*\.whl/
max_error_retry: 3
on:
APPVEYOR_REPO_TAG: true
28 changes: 20 additions & 8 deletions confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
return 0;
}

c_replicas = alloca(sizeof(*c_replicas) *
c_replicas = malloc(sizeof(*c_replicas) *
replica_cnt);

for (ri = 0 ; ri < replica_cnt ; ri++) {
Expand All @@ -206,6 +206,7 @@ static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
"replica_assignment must be "
"a list of int lists with an "
"outer size of %s", err_count_desc);
free(c_replicas);
return 0;
}

Expand All @@ -231,6 +232,8 @@ static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
"Unsupported forApi %s", forApi);
}

free(c_replicas);

if (err) {
PyErr_SetString(
PyExc_ValueError, errstr);
Expand All @@ -255,6 +258,7 @@ Admin_config_dict_to_c (void *c_obj, PyObject *dict, const char *op_name) {

while (PyDict_Next(dict, &pos, &ko, &vo)) {
PyObject *ks, *ks8;
PyObject *vs = NULL, *vs8 = NULL;
const char *k;
const char *v;
rd_kafka_resp_err_t err;
Expand All @@ -268,8 +272,6 @@ Admin_config_dict_to_c (void *c_obj, PyObject *dict, const char *op_name) {

k = cfl_PyUnistr_AsUTF8(ks, &ks8);


PyObject *vs = NULL, *vs8 = NULL;
if (!(vs = cfl_PyObject_Unistr(vo)) ||
!(v = cfl_PyUnistr_AsUTF8(vs, &vs8))) {
PyErr_Format(PyExc_ValueError,
Expand Down Expand Up @@ -367,7 +369,7 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args,
/*
* Parse the list of NewTopics and convert to corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * tcnt);
c_objs = malloc(sizeof(*c_objs) * tcnt);

for (i = 0 ; i < tcnt ; i++) {
NewTopic *newt = (NewTopic *)PyList_GET_ITEM(topics, i);
Expand Down Expand Up @@ -443,13 +445,15 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args,

rd_kafka_NewTopic_destroy_array(c_objs, tcnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_RETURN_NONE;

err:
rd_kafka_NewTopic_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(future); /* from options_to_c() */

return NULL;
Expand Down Expand Up @@ -503,7 +507,7 @@ static PyObject *Admin_delete_topics (Handle *self, PyObject *args,
/*
* Parse the list of strings and convert to corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * tcnt);
c_objs = malloc(sizeof(*c_objs) * tcnt);

for (i = 0 ; i < tcnt ; i++) {
PyObject *topic = PyList_GET_ITEM(topics, i);
Expand Down Expand Up @@ -544,13 +548,15 @@ static PyObject *Admin_delete_topics (Handle *self, PyObject *args,

rd_kafka_DeleteTopic_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_RETURN_NONE;

err:
rd_kafka_DeleteTopic_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(future); /* from options_to_c() */

return NULL;
Expand Down Expand Up @@ -611,7 +617,7 @@ static PyObject *Admin_create_partitions (Handle *self, PyObject *args,
/*
* Parse the list of NewPartitions and convert to corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * tcnt);
c_objs = malloc(sizeof(*c_objs) * tcnt);

for (i = 0 ; i < tcnt ; i++) {
NewPartitions *newp = (NewPartitions *)PyList_GET_ITEM(topics,
Expand Down Expand Up @@ -669,13 +675,15 @@ static PyObject *Admin_create_partitions (Handle *self, PyObject *args,

rd_kafka_NewPartitions_destroy_array(c_objs, tcnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_RETURN_NONE;

err:
rd_kafka_NewPartitions_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(future); /* from options_to_c() */

return NULL;
Expand Down Expand Up @@ -742,7 +750,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args,
* Parse the list of ConfigResources and convert to
* corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * cnt);
c_objs = malloc(sizeof(*c_objs) * cnt);

for (i = 0 ; i < cnt ; i++) {
PyObject *res = PyList_GET_ITEM(resources, i);
Expand Down Expand Up @@ -795,6 +803,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args,

rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_DECREF(ConfigResource_type); /* from lookup() */
Expand All @@ -804,6 +813,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args,
err:
rd_kafka_ConfigResource_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(ConfigResource_type); /* from lookup() */
Py_DECREF(future); /* from options_to_c() */

Expand Down Expand Up @@ -881,7 +891,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args,
* Parse the list of ConfigResources and convert to
* corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * cnt);
c_objs = malloc(sizeof(*c_objs) * cnt);

for (i = 0 ; i < cnt ; i++) {
PyObject *res = PyList_GET_ITEM(resources, i);
Expand Down Expand Up @@ -950,6 +960,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args,

rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_DECREF(ConfigResource_type); /* from lookup() */
Expand All @@ -959,6 +970,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args,
err:
rd_kafka_ConfigResource_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(ConfigResource_type); /* from lookup() */
Py_DECREF(future); /* from options_to_c() */

Expand Down
6 changes: 3 additions & 3 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs)

if (err) {
cfl_PyErr_Format(err,
"Failed to seek to offset %"PRId64": %s",
"Failed to seek to offset %"CFL_PRId64": %s",
tp->offset, rd_kafka_err2str(err));
return NULL;
}
Expand Down Expand Up @@ -917,7 +917,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
PyObject *msglist;
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
CallState cs;
Py_ssize_t i;
Py_ssize_t i, n;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
Expand All @@ -939,7 +939,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,

rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));

Py_ssize_t n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
tmout >= 0 ? (int)(tmout * 1000.0f) : -1,
rkmessages,
num_messages);
Expand Down
24 changes: 12 additions & 12 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -756,15 +756,15 @@ static PyObject *TopicPartition_str0 (TopicPartition *self) {
PyObject *ret;
char offset_str[40];

snprintf(offset_str, sizeof(offset_str), "%"PRId64"", self->offset);
snprintf(offset_str, sizeof(offset_str), "%"CFL_PRId64"", self->offset);

if (self->error != Py_None) {
errstr = cfl_PyObject_Unistr(self->error);
c_errstr = cfl_PyUnistr_AsUTF8(errstr, &errstr8);
}

ret = cfl_PyUnistr(
_FromFormat("TopicPartition{topic=%s,partition=%"PRId32
_FromFormat("TopicPartition{topic=%s,partition=%"CFL_PRId32
",offset=%s,error=%s}",
self->topic, self->partition,
offset_str,
Expand Down Expand Up @@ -1282,14 +1282,14 @@ static void log_cb (const rd_kafka_t *rk, int level,
CallState *cs;
static const int level_map[8] = {
/* Map syslog levels to python logging levels */
[0] = 50, /* LOG_EMERG -> logging.CRITICAL */
[1] = 50, /* LOG_ALERT -> logging.CRITICAL */
[2] = 50, /* LOG_CRIT -> logging.CRITICAL */
[3] = 40, /* LOG_ERR -> logging.ERROR */
[4] = 30, /* LOG_WARNING -> logging.WARNING */
[5] = 20, /* LOG_NOTICE -> logging.INFO */
[6] = 20, /* LOG_INFO -> logging.INFO */
[7] = 10, /* LOG_DEBUG -> logging.DEBUG */
50, /* LOG_EMERG -> logging.CRITICAL */
50, /* LOG_ALERT -> logging.CRITICAL */
50, /* LOG_CRIT -> logging.CRITICAL */
40, /* LOG_ERR -> logging.ERROR */
30, /* LOG_WARNING -> logging.WARNING */
20, /* LOG_NOTICE -> logging.INFO */
20, /* LOG_INFO -> logging.INFO */
10, /* LOG_DEBUG -> logging.DEBUG */
};

cs = CallState_get(h);
Expand Down Expand Up @@ -1444,7 +1444,7 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_delivery")) {
if (!strcmp(name, "on_delivery")) {
if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
Expand Down Expand Up @@ -1486,7 +1486,7 @@ static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_commit")) {
if (!strcmp(name, "on_commit")) {
if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
Expand Down
13 changes: 13 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,21 @@
#include <structmember.h>
#include <pythread.h>

#include <stdint.h>
#include <librdkafka/rdkafka.h>

#ifdef _MSC_VER
/* Windows */
#define CFL_PRId64 "I64d"
#define CFL_PRId32 "I32d"

#else
/* C99 */
#include <inttypes.h>
#define CFL_PRId64 PRId64
#define CFL_PRId32 PRId32
#endif


/**
* Minimum required librdkafka version. This is checked both during
Expand Down
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from setuptools import setup, find_packages
from distutils.core import Extension
import sys
import platform

INSTALL_REQUIRES = list()

Expand All @@ -13,8 +14,15 @@
else:
avro = 'avro-python3'

# On Un*x the library is linked as -lrdkafka,
# while on windows we need the full librdkafka name.
if platform.system() == 'Windows':
librdkafka_libname = 'librdkafka'
else:
librdkafka_libname = 'rdkafka'

module = Extension('confluent_kafka.cimpl',
libraries=['rdkafka'],
libraries=[librdkafka_libname],
sources=['confluent_kafka/src/confluent_kafka.c',
'confluent_kafka/src/Producer.c',
'confluent_kafka/src/Consumer.c',
Expand Down
18 changes: 17 additions & 1 deletion tools/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,23 @@ replace as necessary with your version or remove `rc..` suffix for the
final release.


## 1. Update OpenSSL version if necessary
## 1. Update librdkafka and OpenSSL versions

### 1.1 Update librdkafka version

Change to the latest librdkafka version in the following files:

* `.travis.yml`
* `.appveyor.yml` - this is the librdkafka.redist NuGet version,
make sure to strip the leading "v" from the version.
E.g., `0.11.6` rather than `v0.11.6`

Commit this change:

$ git commit -m "librdkafka version bump to v0.11.6" .travis.yml .appveyor.yml


### 1.2 Update OpenSSL version if necessary

As of v0.11.4 OpenSSL is packaged with the python client. It's important
that the OpenSSL version is kept up to date with the latest release.
Expand Down

0 comments on commit c71e075

Please sign in to comment.